Repository: apex-malhar
Updated Branches:
  refs/heads/master 623b803f5 -> e22ea0de1


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
index e2a94ef..29cd079 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.testbench.EventGenerator;
 import java.util.ArrayList;
 import java.util.HashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.testbench.EventGenerator;
+
 /**
  * HashMap Input Operator used as a helper in testbench benchmarking apps.
  *
@@ -36,11 +38,15 @@ public class HashMapOperator implements InputOperator
 {
   private String keys = null;
   private static final Logger logger = 
LoggerFactory.getLogger(EventGenerator.class);
-  private String[] keysArray = {"a","b","c","d"};
-  public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data 
= new DefaultOutputPort<HashMap<String, Double>>();
-  public final transient DefaultOutputPort<HashMap<String, 
ArrayList<Integer>>> hmapList_data = new DefaultOutputPort<HashMap<String, 
ArrayList<Integer>>>();
-  public final transient DefaultOutputPort<HashMap<String, HashMap<String, 
Integer>>> hmapMap_data = new DefaultOutputPort<HashMap<String, HashMap<String, 
Integer>>>();
-  public final transient DefaultOutputPort<HashMap<String, Integer>> 
hmapInt_data = new DefaultOutputPort<HashMap<String, Integer>>();
+  private String[] keysArray = {"a", "b", "c", "d"};
+  public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data =
+      new DefaultOutputPort<HashMap<String, Double>>();
+  public final transient DefaultOutputPort<HashMap<String, 
ArrayList<Integer>>> hmapList_data =
+      new DefaultOutputPort<HashMap<String, ArrayList<Integer>>>();
+  public final transient DefaultOutputPort<HashMap<String, HashMap<String, 
Integer>>> hmapMap_data =
+      new DefaultOutputPort<HashMap<String, HashMap<String, Integer>>>();
+  public final transient DefaultOutputPort<HashMap<String, Integer>> 
hmapInt_data =
+      new DefaultOutputPort<HashMap<String, Integer>>();
   private int numTuples = 1000;
   private String seed = "a";
   private int numKeys = 2;
@@ -89,7 +95,7 @@ public class HashMapOperator implements InputOperator
         for (int j = 0; j < numKeys; j++) {
           hmapMapTemp.put(keysArray[j], 100 * j);
         }
-         for (int j = 0; j < numKeys; j++) {
+        for (int j = 0; j < numKeys; j++) {
           hmapMap.put(keysArray[j], hmapMapTemp);
         }
         hmapMap_data.emit(hmapMap);
@@ -107,7 +113,7 @@ public class HashMapOperator implements InputOperator
     }
 
     if (hmapInt_data.isConnected()) {
-        for (int i = 0; i < numTuples; i++) {
+      for (int i = 0; i < numTuples; i++) {
         HashMap<String, Integer> hmapMapTemp = new HashMap<String, Integer>();
         for (int j = 0; j < numKeys; j++) {
           hmapMapTemp.put(keysArray[j], 100 * j);
@@ -120,7 +126,8 @@ public class HashMapOperator implements InputOperator
   @Override
   public void beginWindow(long windowId)
   {
-    // throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    // throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
@@ -132,13 +139,15 @@ public class HashMapOperator implements InputOperator
   @Override
   public void setup(OperatorContext context)
   {
-    // throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    // throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
   @Override
   public void teardown()
   {
-    // throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
+    // throw new UnsupportedOperationException("Not supported yet.");
+    // To change body of generated methods, choose Tools | Templates.
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
index da757d9..df5b11e 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for RandomEventGenerator Operator.
@@ -37,14 +38,15 @@ public class RandomEventGeneratorApp implements 
StreamingApplication
 {
   private final Locality locality = null;
   public static final int QUEUE_CAPACITY = 16 * 1024;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     RandomEventGenerator random = dag.addOperator("random", new 
RandomEventGenerator());
     DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull());
     DevNull<String> dev2 = dag.addOperator("dev2", new DevNull());
-    
dag.addStream("random1",random.integer_data,dev1.data).setLocality(locality);
-    
dag.addStream("random2",random.string_data,dev2.data).setLocality(locality);
+    dag.addStream("random1", random.integer_data, 
dev1.data).setLocality(locality);
+    dag.addStream("random2", random.string_data, 
dev2.data).setLocality(locality);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
index db18937..faafcbf 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java
@@ -18,6 +18,10 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
@@ -26,9 +30,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.SeedEventGenerator;
 import com.datatorrent.lib.util.KeyValPair;
-import java.util.ArrayList;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
+
 
 /**
  * Benchmark App for SeedEventGenerator Operator.
@@ -55,10 +57,12 @@ public class SeedEventGeneratorApp implements 
StreamingApplication
     DevNull<HashMap<String, String>> devVal = dag.addOperator("devVal", new 
DevNull<HashMap<String, String>>());
     DevNull<HashMap<String, ArrayList<Integer>>> devList = 
dag.addOperator("devList", new DevNull());
 
-    
dag.getMeta(seedEvent).getMeta(seedEvent.string_data).getAttributes().put(PortContext.QUEUE_CAPACITY,
 QUEUE_CAPACITY);
+    dag.getMeta(seedEvent).getMeta(seedEvent.string_data)
+        .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("SeedEventGeneratorString", seedEvent.string_data, 
devString.data).setLocality(locality);
 
-    
dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes().put(PortContext.QUEUE_CAPACITY,
 QUEUE_CAPACITY);
+    dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes()
+        .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
     dag.addStream("SeedEventGeneratorKeyVal", seedEvent.keyvalpair_list, 
devKeyVal.data).setLocality(locality);
 
     
dag.getMeta(seedEvent).getMeta(seedEvent.val_data).getAttributes().put(PortContext.QUEUE_CAPACITY,
 QUEUE_CAPACITY);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
index 5b66b54..d6e762e 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java
@@ -18,14 +18,16 @@
  */
 package com.datatorrent.benchmark.testbench;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.stream.DevNull;
 import com.datatorrent.lib.testbench.ThroughputCounter;
-import java.util.HashMap;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Benchmark App for ThroughputCounter Operator.
@@ -38,14 +40,15 @@ public class ThroughputCounterApp implements 
StreamingApplication
 {
   public static final int QUEUE_CAPACITY = 16 * 1024;
   private final Locality locality = null;
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
     ThroughputCounter counter = dag.addOperator("counter", new 
ThroughputCounter());
     HashMapOperator oper = dag.addOperator("oper", new HashMapOperator());
-    DevNull<HashMap<String,Number>> dev = dag.addOperator("dev", new 
DevNull());
-    
dag.addStream("count1",oper.hmapInt_data,counter.data).setLocality(locality);
-    dag.addStream("count2",counter.count,dev.data).setLocality(locality);
+    DevNull<HashMap<String, Number>> dev = dag.addOperator("dev", new 
DevNull());
+    dag.addStream("count1", oper.hmapInt_data, 
counter.data).setLocality(locality);
+    dag.addStream("count2", counter.count, dev.data).setLocality(locality);
 
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
index 64af9f9..7250e74 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java
@@ -50,7 +50,8 @@ import 
com.datatorrent.benchmark.window.WindowedOperatorBenchmarkApp.WindowedGen
 import com.datatorrent.lib.fileaccess.TFileImpl;
 import com.datatorrent.lib.stream.DevNull;
 
-public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, 
O extends AbstractWindowedOperator> implements StreamingApplication
+public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, 
O extends AbstractWindowedOperator>
+    implements StreamingApplication
 {
   protected static final String PROP_STORE_PATH = 
"dt.application.WindowedOperatorBenchmark.storeBasePath";
   protected static final String DEFAULT_BASE_PATH = 
"WindowedOperatorBenchmark/Store";
@@ -80,7 +81,8 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G 
extends Operator, O
 
 //    WatermarkGenerator watermarkGenerator = new WatermarkGenerator();
 //    dag.addOperator("WatermarkGenerator", watermarkGenerator);
-//    dag.addStream("Control", watermarkGenerator.control, 
windowedOperator.controlInput).setLocality(Locality.CONTAINER_LOCAL);
+//    dag.addStream("Control", watermarkGenerator.control, 
windowedOperator.controlInput)
+//      .setLocality(Locality.CONTAINER_LOCAL);
 
     DevNull output = dag.addOperator("output", new DevNull());
     dag.addStream("output", windowedOperator.output, 
output.data).setLocality(Locality.CONTAINER_LOCAL);
@@ -112,7 +114,8 @@ public abstract class 
AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
       windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS));
       windowedOperator.setWindowOption(new 
WindowOption.TimeWindows(Duration.standardMinutes(1)));
       //accumulating mode
-      
windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
+      windowedOperator.setTriggerOption(TriggerOption.AtWatermark()
+          
.withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes());
       windowedOperator.setFixedWatermark(30000);
       //windowedOperator.setTriggerOption(TriggerOption.AtWatermark());
 
@@ -153,7 +156,6 @@ public abstract class 
AbstractWindowedOperatorBenchmarkApp<G extends Operator, O
     return basePath;
   }
 
-
   public static class TestStatsListener implements StatsListener, Serializable
   {
     private static final Logger LOG = 
LoggerFactory.getLogger(TestStatsListener.class);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
index 5a9c955..19df8fd 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java
@@ -41,7 +41,8 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.lib.fileaccess.TFileImpl;
 import com.datatorrent.lib.util.KeyValPair;
 
-public class KeyedWindowedOperatorBenchmarkApp extends 
AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator,
 KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
+public class KeyedWindowedOperatorBenchmarkApp extends 
AbstractWindowedOperatorBenchmarkApp<
+    KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, 
KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator>
 {
   public KeyedWindowedOperatorBenchmarkApp()
   {
@@ -58,7 +59,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends 
AbstractWindowedOperatorB
   }
 
   @Override
-  protected void setUpdatedKeyStorage(MyKeyedWindowedOperator 
windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl)
+  protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator,
+      Configuration conf, SpillableComplexComponentImpl sccImpl)
   {
     windowedOperator.setUpdatedKeyStorage(createUpdatedDataStorage(conf, 
sccImpl));
   }
@@ -107,7 +109,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends 
AbstractWindowedOperatorB
     }
   }
 
-  protected static class KeyedWindowedGenerator extends 
AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
+  protected static class KeyedWindowedGenerator extends
+      AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>>
   {
     @Override
     protected TimestampedTuple<KeyValPair<String, Long>> generateNextTuple()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
index 98275ce..d96b453 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java
@@ -35,7 +35,8 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 @ApplicationAnnotation(name = "WindowedOperatorBenchmark")
-public class WindowedOperatorBenchmarkApp extends 
AbstractWindowedOperatorBenchmarkApp<WindowedOperatorBenchmarkApp.WindowedGenerator,
 WindowedOperatorBenchmarkApp.MyWindowedOperator>
+public class WindowedOperatorBenchmarkApp extends 
AbstractWindowedOperatorBenchmarkApp<
+    WindowedOperatorBenchmarkApp.WindowedGenerator, 
WindowedOperatorBenchmarkApp.MyWindowedOperator>
 {
   public WindowedOperatorBenchmarkApp()
   {
@@ -49,7 +50,8 @@ public class WindowedOperatorBenchmarkApp extends 
AbstractWindowedOperatorBenchm
     @Override
     protected TimestampedTuple<Long> generateNextTuple()
     {
-      return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - 
random.nextInt(120000), (long)random.nextInt(100));
+      return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - 
random.nextInt(120000),
+          (long)random.nextInt(100));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java 
b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
index 1d76855..cd8a3ec 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java
@@ -18,16 +18,17 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Context.PortContext;
-
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+
 /**
  * Test the DAG declaration in local mode.
  */
@@ -40,8 +41,10 @@ public class ApplicationFixedTest
     new ApplicationFixed().populateDAG(lma.getDAG(), new Configuration(false));
 
     DAG dag = lma.cloneDAG();
-    FixedTuplesInputOperator wordGenerator = 
(FixedTuplesInputOperator)dag.getOperatorMeta("WordGenerator").getOperator();
-    Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, 
(int)dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY));
+    FixedTuplesInputOperator wordGenerator = (FixedTuplesInputOperator)dag
+        .getOperatorMeta("WordGenerator").getOperator();
+    Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, 
(int)dag.getMeta(wordGenerator)
+        .getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY));
 
     LocalMode.Controller lc = lma.getController();
     lc.run(60000);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java 
b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
index 9439243..0a21a7c 100644
--- a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.Benchmark;
 
 /**
  * Test the DAG declaration in local mode.
@@ -38,7 +37,7 @@ public class BenchmarkTest
   {
     for (final Locality l : Locality.values()) {
       logger.debug("Running the with {} locality", l);
-      LocalMode.runApp(new Benchmark.AbstractApplication ()
+      LocalMode.runApp(new Benchmark.AbstractApplication()
       {
         @Override
         public Locality getLocality()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java 
b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
index 7c9f892..6a1c968 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java
@@ -18,14 +18,17 @@
  */
 package com.datatorrent.benchmark;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
-import org.junit.Test;
+
+import com.datatorrent.api.LocalMode;
 
 public class CouchBaseBenchmarkTest
 {
@@ -52,8 +55,7 @@ public class CouchBaseBenchmarkTest
       LocalMode.Controller lc = lm.getController();
       //lc.setHeartbeatMonitoringEnabled(false);
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       logger.info(ex.getCause());
     }
     is.close();
@@ -76,8 +78,7 @@ public class CouchBaseBenchmarkTest
       lm.prepareDAG(new CouchBaseAppInput(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       logger.info(ex.getCause());
     }
     is.close();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java 
b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
index 5e407a0..e2936fe 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java
@@ -27,6 +27,7 @@ import 
com.datatorrent.contrib.accumulo.AbstractAccumuloOutputOperator;
 import com.datatorrent.contrib.accumulo.AccumuloRowTupleGenerator;
 import com.datatorrent.contrib.accumulo.AccumuloTestHelper;
 import com.datatorrent.contrib.accumulo.AccumuloTuple;
+
 /**
  * BenchMark Results
  * -----------------
@@ -39,14 +40,16 @@ import com.datatorrent.contrib.accumulo.AccumuloTuple;
  *
  * @since 1.0.4
  */
-public class AccumuloApp implements StreamingApplication {
+public class AccumuloApp implements StreamingApplication
+{
 
   @Override
-  public void populateDAG(DAG dag, Configuration conf) {
+  public void populateDAG(DAG dag, Configuration conf)
+  {
     AccumuloTestHelper.getConnector();
     AccumuloTestHelper.clearTable();
     dag.setAttribute(DAG.APPLICATION_NAME, "AccumuloOutputTest");
-    AccumuloRowTupleGenerator rtg = 
dag.addOperator("tuplegenerator",AccumuloRowTupleGenerator.class);
+    AccumuloRowTupleGenerator rtg = dag.addOperator("tuplegenerator", 
AccumuloRowTupleGenerator.class);
     TestAccumuloOutputOperator taop = dag.addOperator("testaccumulooperator", 
TestAccumuloOutputOperator.class);
     dag.addStream("ss", rtg.outputPort, taop.input);
     com.datatorrent.api.Attribute.AttributeMap attributes = 
dag.getAttributes();
@@ -58,12 +61,14 @@ public class AccumuloApp implements StreamingApplication {
 
   }
 
-  public static class TestAccumuloOutputOperator extends 
AbstractAccumuloOutputOperator<AccumuloTuple> {
+  public static class TestAccumuloOutputOperator extends 
AbstractAccumuloOutputOperator<AccumuloTuple>
+  {
 
     @Override
-    public Mutation operationMutation(AccumuloTuple t) {
+    public Mutation operationMutation(AccumuloTuple t)
+    {
       Mutation mutation = new Mutation(t.getRow().getBytes());
-      
mutation.put(t.getColFamily().getBytes(),t.getColName().getBytes(),t.getColValue().getBytes());
+      mutation.put(t.getColFamily().getBytes(), t.getColName().getBytes(), 
t.getColValue().getBytes());
       return mutation;
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
index 8f0a1fd..8b47a9b 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java
@@ -22,9 +22,11 @@ import org.junit.Test;
 
 import com.datatorrent.api.LocalMode;
 
-public class AccumuloAppTest {
+public class AccumuloAppTest
+{
   @Test
-  public void testSomeMethod() throws Exception {
+  public void testSomeMethod() throws Exception
+  {
     LocalMode.runApp(new AccumuloApp(), 30000);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
index 8f2e19f..14fe441 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java
@@ -21,12 +21,13 @@ package com.datatorrent.benchmark.aerospike;
 import org.junit.Test;
 
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.aerospike.AerospikeOutputBenchmarkApplication;
 
-public class AerospikeBenchmarkAppTest {
+public class AerospikeBenchmarkAppTest
+{
 
   @Test
-  public void test() throws Exception {
+  public void test() throws Exception
+  {
 
     LocalMode.runApp(new AerospikeOutputBenchmarkApplication(), 10000);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
index c54cbdf..079d073 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java
@@ -18,10 +18,12 @@
  */
 package com.datatorrent.benchmark.algo;
 
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Test the DAG declaration in local mode.
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
index e85e38c..ec4f308 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java
@@ -19,16 +19,18 @@
 package com.datatorrent.benchmark.cassandra;
 
 import org.junit.Test;
+
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.benchmark.cassandra.CassandraOutputBenchmarkApplication;
 
 /**
  * Test the DAG declaration in local mode.
  */
-public class CassandraApplicatonTest {
+public class CassandraApplicatonTest
+{
 
   @Test
-  public void test() throws Exception {
+  public void test() throws Exception
+  {
     LocalMode.runApp(new CassandraOutputBenchmarkApplication(), 10000);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
index 1658ab1..32a4907 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java
@@ -19,16 +19,19 @@
 package com.datatorrent.benchmark.hbase;
 
 import org.junit.Test;
+
 import com.datatorrent.api.LocalMode;
 
 /**
  * Test the DAG declaration in local mode.
  */
-public class HBaseApplicationTest {
+public class HBaseApplicationTest
+{
 
   @Test
-  public void test() throws Exception {
-     LocalMode.runApp(new HBaseCsvMappingApplication(), 20000);
+  public void test() throws Exception
+  {
+    LocalMode.runApp(new HBaseCsvMappingApplication(), 20000);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
index 562559f..b61f1d3 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java
@@ -37,7 +37,7 @@ import com.datatorrent.contrib.hbase.HBaseRowStringGenerator;
  *
  * @since 1.0.4
  */
-@ApplicationAnnotation(name="HBaseBenchmarkApp")
+@ApplicationAnnotation(name = "HBaseBenchmarkApp")
 public class HBaseCsvMappingApplication implements StreamingApplication
 {
   private final Locality locality = null;
@@ -47,15 +47,12 @@ public class HBaseCsvMappingApplication implements 
StreamingApplication
   {
     HBaseRowStringGenerator row = dag.addOperator("rand", new 
HBaseRowStringGenerator());
 
-
     HBaseCsvMappingPutOperator csvMappingPutOperator = 
dag.addOperator("HBaseoper", new HBaseCsvMappingPutOperator());
     csvMappingPutOperator.getStore().setTableName("table1");
     csvMappingPutOperator.getStore().setZookeeperQuorum("127.0.0.1");
     csvMappingPutOperator.getStore().setZookeeperClientPort(2181);
     
csvMappingPutOperator.setMappingString("colfam0.street,colfam0.city,colfam0.state,row");
-    dag.addStream("hbasestream",row.outputPort, 
csvMappingPutOperator.input).setLocality(locality);
+    dag.addStream("hbasestream", row.outputPort, 
csvMappingPutOperator.input).setLocality(locality);
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
index fdab095..653c6f6 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java
@@ -18,18 +18,21 @@
  */
 package com.datatorrent.benchmark.hive;
 
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.netlet.util.DTThrowable;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.sql.SQLException;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.netlet.util.DTThrowable;
+
 public class HiveInsertBenchmarkTest
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveInsertBenchmarkTest.class);
@@ -41,26 +44,30 @@ public class HiveInsertBenchmarkTest
     InputStream inputStream = null;
     try {
       inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml");
-    }
-    catch (FileNotFoundException ex) {
-      LOG.debug("Exception caught",ex);
+    } catch (FileNotFoundException ex) {
+      LOG.debug("Exception caught", ex);
     }
     conf.addResource(inputStream);
 
-    LOG.debug("conf properties are {}" , 
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
-    LOG.debug("conf dburl is {}" , 
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    LOG.debug("conf filepath is {}" , 
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
-    LOG.debug("maximum length is {}" , 
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength"));
-    LOG.debug("tablename is {}" , 
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
-    LOG.debug("permission is 
{}",conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission"));
+    LOG.debug("conf properties are {}",
+        
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    LOG.debug("conf dburl is {}",
+        
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
+    LOG.debug("conf filepath is {}",
+        
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
+    LOG.debug("maximum length is {}",
+        
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength"));
+    LOG.debug("tablename is {}",
+        
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+    LOG.debug("permission is {}",
+        
conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission"));
     HiveInsertBenchmarkingApp app = new HiveInsertBenchmarkingApp();
     LocalMode lm = LocalMode.newInstance();
     try {
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(120000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
index 3acba3e..e0097c6 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.benchmark.hive;
 
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.sql.SQLException;
 
@@ -28,16 +30,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 
-
 import com.datatorrent.api.LocalMode;
-
 import com.datatorrent.netlet.util.DTThrowable;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 
 public class HiveMapBenchmarkTest
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveMapBenchmarkTest.class);
+
   @Test
   public void testMethod() throws SQLException
   {
@@ -45,18 +44,24 @@ public class HiveMapBenchmarkTest
     InputStream inputStream = null;
     try {
       inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml");
-    }
-    catch (FileNotFoundException ex) {
-      LOG.debug("Exception caught {}",ex);
+    } catch (FileNotFoundException ex) {
+      LOG.debug("Exception caught {}", ex);
     }
     conf.addResource(inputStream);
-    LOG.debug("conf properties are {}" , 
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
-    LOG.debug("conf dburl is {}" , 
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
-    LOG.debug("conf filepath is {}" , 
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
-    LOG.debug("maximum length is {}" , 
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength"));
-    LOG.debug("tablename is {}" , 
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"));
-    LOG.debug("permission is 
{}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission"));
-    LOG.debug("delimiter is 
{}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter"));
+    LOG.debug("conf properties are {}",
+        
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties"));
+    LOG.debug("conf dburl is {}",
+        
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl"));
+    LOG.debug("conf filepath is {}",
+        
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath"));
+    LOG.debug("maximum length is {}",
+        
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength"));
+    LOG.debug("tablename is {}",
+        
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"));
+    LOG.debug("permission is {}",
+        
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission"));
+    LOG.debug("delimiter is {}",
+        
conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter"));
 
     HiveMapInsertBenchmarkingApp app = new HiveMapInsertBenchmarkingApp();
     LocalMode lm = LocalMode.newInstance();
@@ -64,14 +69,11 @@ public class HiveMapBenchmarkTest
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(30000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 
     IOUtils.closeQuietly(inputStream);
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
index e2d2f6a..6cb901a 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java
@@ -43,8 +43,7 @@ public class KafkaInputBenchmarkTest
       lma.prepareDAG(new KafkaInputBenchmark(), conf);
       LocalMode.Controller lc = lma.getController();
       lc.run(30000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
index d85372f..4de7193 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
 
-
 public class KafkaOutputBenchmarkTest
 {
   @Test
@@ -44,8 +43,7 @@ public class KafkaOutputBenchmarkTest
       lma.prepareDAG(new KafkaInputBenchmark(), conf);
       LocalMode.Controller lc = lma.getController();
       lc.run(30000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
index 4c046fb..9201cd5 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java
@@ -18,13 +18,16 @@
  */
 package com.datatorrent.benchmark.memsql;
 
-import com.datatorrent.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.memsql.MemsqlInputOperator;
 import com.datatorrent.lib.stream.DevNull;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * BenchMark Results
@@ -39,7 +42,7 @@ import org.slf4j.LoggerFactory;
  *
  * @since 1.0.5
  */
-@ApplicationAnnotation(name="MemsqlInputBenchmark")
+@ApplicationAnnotation(name = "MemsqlInputBenchmark")
 public class MemsqlInputBenchmark implements StreamingApplication
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(MemsqlInputBenchmark.class);
@@ -50,13 +53,13 @@ public class MemsqlInputBenchmark implements 
StreamingApplication
   public void populateDAG(DAG dag, Configuration conf)
   {
     MemsqlInputOperator memsqlInputOperator = 
dag.addOperator("memsqlInputOperator",
-                                                                new 
MemsqlInputOperator());
+        new MemsqlInputOperator());
 
     DevNull<Object> devNull = dag.addOperator("devnull",
-                                      new DevNull<Object>());
+        new DevNull<Object>());
 
     dag.addStream("memsqlconnector",
-                  memsqlInputOperator.outputPort,
-                  devNull.data);
+        memsqlInputOperator.outputPort,
+        devNull.data);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
index 55fec7c..fa98a18 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java
@@ -18,26 +18,33 @@
  */
 package com.datatorrent.benchmark.memsql;
 
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.Operator.ProcessingMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.contrib.memsql.*;
-import static 
com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE;
-import static 
com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.*;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.sql.SQLException;
 import java.util.Random;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
+import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
+import com.datatorrent.contrib.memsql.MemsqlStore;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.netlet.util.DTThrowable;
+
+import static 
com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE;
+import static 
com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID;
+import static 
com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID;
+
 public class MemsqlInputBenchmarkTest
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(MemsqlInputBenchmarkTest.class);
@@ -52,28 +59,33 @@ public class MemsqlInputBenchmarkTest
 
     MemsqlStore memsqlStore = new MemsqlStore();
     memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl"));
-    
memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
+    memsqlStore.setConnectionProperties(
+        
conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
 
     AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
 
     MemsqlPOJOOutputOperator outputOperator = new MemsqlPOJOOutputOperator();
-    
outputOperator.getStore().setDatabaseUrl(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
-    
outputOperator.getStore().setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
+    outputOperator.getStore().setDatabaseUrl(
+        
conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl"));
+    outputOperator.getStore().setConnectionProperties(
+        
conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties"));
     outputOperator.setBatchSize(BATCH_SIZE);
 
     Random random = new Random();
-    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap = new 
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap 
attributeMap =
+        new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
     attributeMap.put(OperatorContext.PROCESSING_MODE, 
ProcessingMode.AT_LEAST_ONCE);
     attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
     attributeMap.put(DAG.APPLICATION_ID, APP_ID);
-    OperatorContextTestHelper.TestIdOperatorContext context = new 
OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+    OperatorContextTestHelper.TestIdOperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, 
attributeMap);
 
     long seedSize = conf.getLong("dt.seedSize", SEED_SIZE);
 
     outputOperator.setup(context);
     outputOperator.beginWindow(0);
 
-    for(long valueCounter = 0;
+    for (long valueCounter = 0;
         valueCounter < seedSize;
         valueCounter++) {
       outputOperator.input.put(random.nextInt());
@@ -89,8 +101,7 @@ public class MemsqlInputBenchmarkTest
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
index 8534e20..297bc6d 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.benchmark.memsql;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator;
 import com.datatorrent.lib.testbench.RandomEventGenerator;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * BenchMark Results
@@ -41,10 +43,10 @@ import org.slf4j.LoggerFactory;
  *
  * @since 1.0.5
  */
-@ApplicationAnnotation(name="MemsqlOutputBenchmark")
+@ApplicationAnnotation(name = "MemsqlOutputBenchmark")
 public class MemsqlOutputBenchmark implements StreamingApplication
 {
-  private static transient final Logger LOG = 
LoggerFactory.getLogger(MemsqlOutputBenchmark.class);
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(MemsqlOutputBenchmark.class);
 
   public static final int DEFAULT_BATCH_SIZE = 1000;
   public static final int MAX_WINDOW_COUNT = 10000;
@@ -61,7 +63,7 @@ public class MemsqlOutputBenchmark implements 
StreamingApplication
     @Override
     public void emitTuples()
     {
-      if(done) {
+      if (done) {
         return;
       }
 
@@ -73,8 +75,7 @@ public class MemsqlOutputBenchmark implements 
StreamingApplication
     {
       try {
         super.endWindow();
-      }
-      catch(Exception e) {
+      } catch (Exception e) {
         done = true;
       }
     }
@@ -83,20 +84,21 @@ public class MemsqlOutputBenchmark implements 
StreamingApplication
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    CustomRandomEventGenerator randomEventGenerator = 
dag.addOperator("randomEventGenerator", new CustomRandomEventGenerator());
+    CustomRandomEventGenerator randomEventGenerator = dag.addOperator(
+        "randomEventGenerator", new CustomRandomEventGenerator());
     randomEventGenerator.setMaxCountOfWindows(MAX_WINDOW_COUNT);
     randomEventGenerator.setTuplesBlastIntervalMillis(TUPLE_BLAST_MILLIS);
     randomEventGenerator.setTuplesBlast(TUPLE_BLAST);
 
     LOG.debug("Before making output operator");
     MemsqlPOJOOutputOperator memsqlOutputOperator = 
dag.addOperator("memsqlOutputOperator",
-                                                                new 
MemsqlPOJOOutputOperator());
+        new MemsqlPOJOOutputOperator());
     LOG.debug("After making output operator");
 
     memsqlOutputOperator.setBatchSize(DEFAULT_BATCH_SIZE);
 
     dag.addStream("memsqlConnector",
-                  randomEventGenerator.integer_data,
-                  memsqlOutputOperator.input);
+        randomEventGenerator.integer_data,
+        memsqlOutputOperator.input);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
index bab0c9e..bf82ab3 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java
@@ -18,20 +18,23 @@
  */
 package com.datatorrent.benchmark.memsql;
 
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
-import com.datatorrent.contrib.memsql.MemsqlStore;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.sql.SQLException;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest;
+import com.datatorrent.contrib.memsql.MemsqlStore;
+import com.datatorrent.netlet.util.DTThrowable;
+
 public class MemsqlOutputBenchmarkTest
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(MemsqlOutputBenchmarkTest.class);
@@ -45,7 +48,8 @@ public class MemsqlOutputBenchmarkTest
 
     MemsqlStore memsqlStore = new MemsqlStore();
     memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl"));
-    
memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties"));
+    memsqlStore.setConnectionProperties(
+        
conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties"));
 
     AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore);
 
@@ -56,8 +60,7 @@ public class MemsqlOutputBenchmarkTest
       lm.prepareDAG(app, conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
+    } catch (Exception ex) {
       DTThrowable.rethrow(ex);
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
index 9f82e79..d270e7f 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java
@@ -18,14 +18,17 @@
  */
 package com.datatorrent.benchmark.script;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for Ruby Operator in local mode.
  */
-public class RubyOperatorBenchmarkAppTest {
+public class RubyOperatorBenchmarkAppTest
+{
 
   @Test
   public void testApplication() throws Exception

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
index 7e64c5f..b87fec1 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
@@ -35,7 +35,6 @@ import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.datatorrent.lib.fileaccess.TFileImpl;
 
-
 public class SpillableDSBenchmarkTest
 {
   private static final Logger logger = 
LoggerFactory.getLogger(SpillableDSBenchmarkTest.class);
@@ -57,7 +56,6 @@ public class SpillableDSBenchmarkTest
   @Rule
   public SpillableTestUtils.TestMeta testMeta = new 
SpillableTestUtils.TestMeta();
 
-
   @Before
   public void setup()
   {
@@ -116,7 +114,8 @@ public class SpillableDSBenchmarkTest
       long spentTime = System.currentTimeMillis() - startTime;
       if (spentTime > outputTimes * 5000) {
         ++outputTimes;
-        logger.info("Total Statistics: Spent {} mills for {} operation. 
average/second: {}", spentTime, i, i * 1000 / spentTime);
+        logger.info("Total Statistics: Spent {} mills for {} operation. 
average/second: {}",
+            spentTime, i, i * 1000 / spentTime);
         checkEnvironment();
       }
     }
@@ -126,7 +125,6 @@ public class SpillableDSBenchmarkTest
         loopCount / spentTime);
   }
 
-
   public void putEntry(SpillableMapImpl<String, String> map)
   {
     map.put(keys[random.nextInt(keys.length)], 
values[random.nextInt(values.length)]);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
index 4f03a10..dc8f4b4 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -34,7 +34,6 @@ import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
 /**
  * This is not a really unit test, but in fact a benchmark runner.
  * Provides this class to give developers the convenience to run in local IDE 
environment.
- *
  */
 public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
 {
@@ -91,8 +90,6 @@ public class ManagedStateBenchmarkAppTest extends 
ManagedStateBenchmarkApp
     lc.shutdown();
   }
 
-
-
   @Override
   public String getStoreBasePath(Configuration conf)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
index 14fd7e3..99d8a1f 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventClassifier Operator in local mode.
  */
@@ -47,9 +50,8 @@ public class EventClassifierAppTest
       lm.prepareDAG(new EventClassifierApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
index b793ecd..929d8bc 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventClassifierNumberToHashDouble Operator in local mode.
  */
@@ -48,9 +51,8 @@ public class EventClassifierNumberToHashDoubleAppTest
       lm.prepareDAG(new EventClassifierNumberToHashDoubleApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
index 4808ff2..5a427a5 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventGenerator Operator in local mode.
  */
@@ -50,9 +53,8 @@ public class EventGeneratorAppTest
       lm.prepareDAG(new EventGeneratorApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
index 4384256..1a85a7b 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for EventIncrementerApp Operator in local mode.
  */
@@ -48,9 +51,8 @@ public class EventIncrementerAppTest
       lm.prepareDAG(new EventIncrementerApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
index e1368ba..9419022 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java
@@ -19,6 +19,8 @@
 package com.datatorrent.benchmark.testbench;
 
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.InputStream;
 
 import org.junit.Test;
@@ -28,8 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.LocalMode;
-import java.io.FileNotFoundException;
-import java.io.IOException;
+
 /**
  * Benchmark Test for FilterClassifierApp Operator in local mode.
  */
@@ -49,9 +50,8 @@ public class FilterClassifierAppTest
       lm.prepareDAG(new FilterClassifierApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
index 95453f2..977d6b7 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for FilteredEventClassifierApp Operator in local mode.
  */
@@ -47,9 +50,8 @@ public class FilteredEventClassifierAppTest
       lm.prepareDAG(new FilteredEventClassifierApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
index cc180f0..92ca0fd 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java
@@ -18,16 +18,19 @@
  */
 package com.datatorrent.benchmark.testbench;
 
-import com.datatorrent.api.LocalMode;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.hadoop.conf.Configuration;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.LocalMode;
+
 /**
  * Benchmark Test for ThroughputCounterApp Operator in local mode.
  */
@@ -47,9 +50,8 @@ public class ThroughputCounterAppTest
       lm.prepareDAG(new ThroughputCounterApp(), conf);
       LocalMode.Controller lc = lm.getController();
       lc.run(20000);
-    }
-    catch (Exception ex) {
-       logger.info(ex.getMessage());
+    } catch (Exception ex) {
+      logger.info(ex.getMessage());
     }
     is.close();
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
index 98ecf67..157accc 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
@@ -40,7 +40,6 @@ public class GenericSerdePerformanceTest
   private Random random = new Random();
   private int serdeDataSize = 1000000;
 
-
   @Test
   public void testCompareSerdeForString()
   {
@@ -74,7 +73,6 @@ public class GenericSerdePerformanceTest
     buffer.release();
   }
 
-
   @Test
   public void testCompareSerdeForRealCase()
   {
@@ -88,7 +86,6 @@ public class GenericSerdePerformanceTest
     long genericSerdeCost = System.currentTimeMillis() - beginTime;
     logger.info("Generic Serde cost for ImmutablePair: {}", genericSerdeCost);
 
-
     beginTime = System.currentTimeMillis();
     Kryo kryo = new Kryo();
     for (int i = 0; i < serdeDataSize; ++i) {
@@ -99,7 +96,6 @@ public class GenericSerdePerformanceTest
     long kryoSerdeCost = System.currentTimeMillis() - beginTime;
     logger.info("Kryo Serde cost for ImmutablePair without class info: {}", 
kryoSerdeCost);
 
-
     beginTime = System.currentTimeMillis();
     Kryo kryo1 = new Kryo();
     for (int i = 0; i < serdeDataSize; ++i) {
@@ -113,6 +109,7 @@ public class GenericSerdePerformanceTest
 
   protected ImmutablePair generatePair(long now)
   {
-    return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), 
random.nextInt(100)), "" + random.nextInt(1000));
+    return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100),
+        random.nextInt(100)), "" + random.nextInt(1000));
   }
 }

Reply via email to