Fixed checkstyle errors for demos.

Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7d9386d2
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7d9386d2
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7d9386d2

Branch: refs/heads/master
Commit: 7d9386d2abb445954055f6bf8c6d2aa75f896d77
Parents: 846b4a3
Author: MalharJenkins <[email protected]>
Authored: Fri Aug 26 13:33:43 2016 -0700
Committer: Shunxin <[email protected]>
Committed: Fri Aug 26 14:07:31 2016 -0700

----------------------------------------------------------------------
 .../distributeddistinct/CountVerifier.java      |  10 +-
 .../IntegerUniqueValueCountAppender.java        |  17 +-
 .../RandomKeyValGenerator.java                  |  14 +-
 .../UniqueValueCountAppender.java               |  22 +-
 .../distributeddistinct/ApplicationTest.java    |   3 +-
 .../DistributedDistinctTest.java                |   4 +-
 .../StatefulApplicationTest.java                |  19 +-
 .../StatefulUniqueCountTest.java                |  23 +-
 .../demos/echoserver/Application.java           |   2 +-
 .../demos/echoserver/MessageResponder.java      |   4 +-
 .../demos/echoserver/NetworkManager.java        |  47 ++-
 .../demos/echoserver/ApplicationTest.java       |   6 +-
 .../demos/frauddetect/Application.java          |   8 +-
 .../frauddetect/AverageAlertingOperator.java    |  49 ++-
 .../demos/frauddetect/BankIdNumberKey.java      |   4 +-
 .../BankIdNumberSamplerOperator.java            |  41 +--
 .../CreditCardAmountSamplerOperator.java        |  43 +--
 .../demos/frauddetect/MerchantTransaction.java  |   4 +-
 .../MerchantTransactionBucketOperator.java      |  60 ++--
 .../MerchantTransactionGenerator.java           |  22 +-
 .../MerchantTransactionInputHandler.java        |  17 +-
 .../frauddetect/SlidingWindowSumKeyVal.java     |  56 ++--
 .../frauddetect/SlidingWindowSumObject.java     |   6 +-
 .../frauddetect/TransactionStatsAggregator.java |  26 +-
 .../operator/HdfsStringOutputOperator.java      |   7 +-
 .../operator/MongoDBOutputOperator.java         | 281 +++++++++--------
 .../demos/frauddetect/util/JsonUtils.java       |  17 +-
 .../frauddetect/FrauddetectApplicationTest.java |  40 +--
 .../malhar/stream/sample/MinimalWordCount.java  |  10 +-
 .../malhar/stream/sample/WindowedWordCount.java |  66 ++--
 .../stream/sample/complete/AutoComplete.java    |  17 +-
 .../sample/complete/CompletionCandidate.java    |   2 +-
 .../stream/sample/complete/PojoEvent.java       |   6 +-
 .../sample/complete/StreamingWordExtract.java   |  36 +--
 .../sample/complete/TopWikipediaSessions.java   |  70 ++---
 .../stream/sample/complete/TrafficRoutes.java   | 132 ++++----
 .../sample/complete/TwitterAutoComplete.java    |   3 +-
 .../sample/cookbook/CombinePerKeyExamples.java  |  40 +--
 .../stream/sample/cookbook/DeDupExample.java    |  24 +-
 .../stream/sample/cookbook/InputPojo.java       |  18 +-
 .../sample/cookbook/MaxPerKeyExamples.java      |  42 +--
 .../stream/sample/cookbook/OutputPojo.java      |  10 +-
 .../stream/sample/cookbook/TriggerExample.java  |   4 +-
 .../stream/sample/MinimalWordCountTest.java     |   8 +-
 .../stream/sample/WindowedWordCountTest.java    |  10 +-
 .../sample/complete/AutoCompleteTest.java       |   8 +-
 .../complete/StreamingWordExtractTest.java      |  26 +-
 .../complete/TopWikipediaSessionsTest.java      |   8 +-
 .../sample/complete/TrafficRoutesTest.java      |   8 +-
 .../cookbook/CombinePerKeyExamplesTest.java     |   8 +-
 .../sample/cookbook/DeDupExampleTest.java       |   8 +-
 .../sample/cookbook/MaxPerKeyExamplesTest.java  |  60 ++--
 .../demos/iteration/Application.java            |  23 +-
 .../demos/iteration/ApplicationTest.java        |  14 +-
 .../demos/machinedata/Application.java          |  15 +-
 .../demos/machinedata/DimensionGenerator.java   |  18 +-
 .../demos/machinedata/InputReceiver.java        |  20 +-
 .../demos/machinedata/data/MachineKey.java      |  35 ++-
 .../demos/machinedata/data/ResourceType.java    |  54 ++--
 .../operator/CalculatorOperator.java            |  25 +-
 .../operator/MachineInfoAveragingOperator.java  |  26 +-
 ...chineInfoAveragingPrerequisitesOperator.java |  18 +-
 .../operator/MachineInfoAveragingUnifier.java   |   5 +-
 .../demos/machinedata/util/Combinatorics.java   | 119 +++----
 .../demos/machinedata/util/DataTable.java       |  60 ++--
 .../machinedata/CalculatorOperatorTest.java     |  34 +-
 .../datatorrent/demos/mobile/Application.java   |  22 +-
 .../demos/mobile/PhoneEntryOperator.java        |  23 +-
 .../demos/mobile/PhoneMovementGenerator.java    |  56 ++--
 .../demos/mobile/ApplicationTest.java           |   8 +-
 .../demos/mrmonitor/Application.java            |   7 +-
 .../datatorrent/demos/mrmonitor/Constants.java  |  28 +-
 .../demos/mrmonitor/MRJobStatusOperator.java    |  50 ++-
 .../mrmonitor/MRMonitoringApplication.java      |   3 +-
 .../demos/mrmonitor/MRStatusObject.java         |  22 +-
 .../com/datatorrent/demos/mrmonitor/MRUtil.java |   7 +-
 .../demos/mrmonitor/MapToMRObjectOperator.java  |   3 +-
 .../mrmonitor/MrMonitoringApplicationTest.java  |   5 +-
 .../demos/mroperator/DateWritable.java          |  76 ++---
 .../mroperator/HdfsKeyValOutputOperator.java    |   2 +-
 .../mroperator/InvertedIndexApplication.java    |   2 +-
 .../demos/mroperator/LineIndexer.java           |  35 ++-
 .../demos/mroperator/LogCountsPerHour.java      | 313 +++++++++----------
 .../demos/mroperator/LogsCountApplication.java  |   2 +-
 .../demos/mroperator/MapOperator.java           |  83 ++---
 .../mroperator/NewWordCountApplication.java     |  20 +-
 .../demos/mroperator/OutputCollectorImpl.java   |  87 +++---
 .../demos/mroperator/ReduceOperator.java        | 266 ++++++++--------
 .../demos/mroperator/ReporterImpl.java          | 165 +++++-----
 .../datatorrent/demos/mroperator/WordCount.java |  23 +-
 .../demos/mroperator/MapOperatorTest.java       |  31 +-
 .../demos/mroperator/ReduceOperatorTest.java    |  70 +++--
 .../mroperator/WordCountMRApplicationTest.java  |   7 +-
 .../com/datatorrent/demos/pi/Application.java   |   7 +-
 .../demos/pi/ApplicationAppData.java            |   2 +-
 .../demos/pi/ApplicationWithScript.java         |  13 +-
 .../com/datatorrent/demos/pi/Calculator.java    |   4 +-
 .../datatorrent/demos/pi/NamedValueList.java    |  16 +-
 .../demos/pi/PiCalculateOperator.java           |   5 +-
 .../datatorrent/demos/pi/ApplicationTest.java   |  18 +-
 .../datatorrent/demos/pi/CalculatorTest.java    |   5 +-
 .../demos/r/oldfaithful/FaithfulRScript.java    |  16 +-
 .../demos/r/oldfaithful/InputGenerator.java     |   5 +-
 .../r/oldfaithful/OldFaithfulApplication.java   |   5 +-
 .../oldfaithful/OldFaithfulApplicationTest.java |   2 +-
 .../twitter/KinesisHashtagsApplication.java     |   8 +-
 .../demos/twitter/SlidingContainer.java         |   2 +-
 .../demos/twitter/TwitterDumpApplication.java   |   6 +-
 .../twitter/TwitterDumpHBaseApplication.java    |   8 +-
 .../twitter/TwitterStatusHashtagExtractor.java  |   8 +-
 .../twitter/TwitterStatusURLExtractor.java      |   7 +-
 .../twitter/TwitterStatusWordExtractor.java     |  16 +-
 .../twitter/TwitterTopCounterApplication.java   |  22 +-
 .../twitter/TwitterTopWordsApplication.java     |   6 +-
 .../TwitterTrendingHashtagsApplication.java     |   7 +-
 .../com/datatorrent/demos/twitter/URLSerDe.java |  10 +-
 .../demos/twitter/WindowedTopCounter.java       |  29 +-
 .../twitter/TwitterDumpApplicationTest.java     |   3 +-
 .../demos/twitter/TwitterTopCounterTest.java    |   5 +-
 .../demos/twitter/TwitterTopWordsTest.java      |  11 +-
 .../demos/uniquecount/Application.java          |   6 +-
 .../demos/uniquecount/CountVerifier.java        |  48 ++-
 .../demos/uniquecount/RandomDataGenerator.java  |  18 +-
 .../demos/uniquecount/RandomKeyValues.java      | 222 ++++++-------
 .../demos/uniquecount/RandomKeysGenerator.java  |  18 +-
 .../uniquecount/UniqueKeyValCountDemo.java      |   7 +-
 .../demos/uniquecount/ApplicationTest.java      |   4 +-
 .../demos/uniquecount/UniqueKeyValDemoTest.java |   4 +-
 .../demos/wordcount/Application.java            |  19 +-
 .../wordcount/ApplicationWithQuerySupport.java  |  33 +-
 .../demos/wordcount/FileWordCount.java          |  45 +--
 .../datatorrent/demos/wordcount/LineReader.java |   6 +-
 .../com/datatorrent/demos/wordcount/WCPair.java |  17 +-
 .../demos/wordcount/WindowWordCount.java        |   4 +-
 .../demos/wordcount/WordCountInputOperator.java | 128 ++++----
 .../demos/wordcount/WordCountWriter.java        |  10 +-
 .../datatorrent/demos/wordcount/WordReader.java |  14 +-
 .../demos/wordcount/ApplicationTest.java        |  14 +-
 .../yahoofinance/ApplicationWithDerbySQL.java   |  15 +-
 .../demos/yahoofinance/StockTickInput.java      |  50 +--
 .../yahoofinance/YahooFinanceApplication.java   |   9 +-
 .../YahooFinanceCSVInputOperator.java           |  28 +-
 .../demos/yahoofinance/ApplicationTest.java     |   5 +-
 .../ApplicationWithDerbySQLTest.java            |  12 +-
 144 files changed, 2238 insertions(+), 2102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java
 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java
index d085744..417ed7c 100644
--- 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java
+++ 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java
@@ -39,7 +39,8 @@ public class CountVerifier implements Operator
   Map<Integer, Integer> trueCount = new HashMap<Integer, Integer>();
   Map<Integer, Integer> receivedCount = new HashMap<Integer, Integer>();
 
-  public transient final DefaultInputPort<KeyValPair<Integer, Integer>> trueIn 
= new DefaultInputPort<KeyValPair<Integer, Integer>>() {
+  public final transient DefaultInputPort<KeyValPair<Integer, Integer>> trueIn 
= new DefaultInputPort<KeyValPair<Integer, Integer>>()
+  {
     @Override
     public void process(KeyValPair<Integer, Integer> tuple)
     {
@@ -47,7 +48,8 @@ public class CountVerifier implements Operator
     }
   };
 
-  public transient final DefaultInputPort<KeyValPair<Integer, Integer>> recIn 
= new DefaultInputPort<KeyValPair<Integer, Integer>>() {
+  public final transient DefaultInputPort<KeyValPair<Integer, Integer>> recIn 
= new DefaultInputPort<KeyValPair<Integer, Integer>>()
+  {
     @Override
     public void process(KeyValPair<Integer, Integer> tuple)
     {
@@ -56,9 +58,9 @@ public class CountVerifier implements Operator
   };
 
   @OutputPortFieldAnnotation(optional = true)
-  public transient final DefaultOutputPort<Integer> successPort = new 
DefaultOutputPort<Integer>();
+  public final transient DefaultOutputPort<Integer> successPort = new 
DefaultOutputPort<Integer>();
   @OutputPortFieldAnnotation(optional = true)
-  public transient final DefaultOutputPort<Integer> failurePort = new 
DefaultOutputPort<Integer>();
+  public final transient DefaultOutputPort<Integer> failurePort = new 
DefaultOutputPort<Integer>();
 
   @Override
   public void setup(OperatorContext arg0)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java
 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java
index 28baf06..bf8a307 100644
--- 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java
+++ 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java
@@ -42,38 +42,39 @@ public class IntegerUniqueValueCountAppender extends 
UniqueValueCountAppender<In
   {
     Set<Integer> valSet = new HashSet<Integer>();
     try {
-      while (resultSet.next())
+      while (resultSet.next()) {
         valSet.add(resultSet.getInt(1));
+      }
       return valSet;
     } catch (SQLException e) {
       throw new RuntimeException("while processing the result set", e);
     }
   }
-  
+
   @Override
   protected void prepareGetStatement(PreparedStatement getStatement, Object 
key) throws SQLException
   {
-    getStatement.setInt(1, (Integer) key);
+    getStatement.setInt(1, (Integer)key);
   }
 
   @Override
   protected void preparePutStatement(PreparedStatement putStatement, Object 
key, Object value) throws SQLException
   {
     @SuppressWarnings("unchecked")
-    Set<Integer> valueSet = (Set<Integer>) value;
+    Set<Integer> valueSet = (Set<Integer>)value;
     for (Integer val : valueSet) {
       @SuppressWarnings("unchecked")
-      Set<Integer> currentVals = (Set<Integer>) get(key);
+      Set<Integer> currentVals = (Set<Integer>)get(key);
       if (!currentVals.contains(val)) {
         batch = true;
-        putStatement.setInt(1, (Integer) key);
+        putStatement.setInt(1, (Integer)key);
         putStatement.setInt(2, val);
         putStatement.setLong(3, windowID);
         putStatement.addBatch();
       }
     }
   }
-  
+
   @Override
   public void endWindow()
   {
@@ -84,7 +85,7 @@ public class IntegerUniqueValueCountAppender extends 
UniqueValueCountAppender<In
       while (resultSet.next()) {
         int val = resultSet.getInt(1);
         @SuppressWarnings("unchecked")
-        Set<Integer> valSet = (Set<Integer>) cacheManager.get(val);
+        Set<Integer> valSet = (Set<Integer>)cacheManager.get(val);
         output.emit(new KeyValPair<Object, Object>(val, valSet.size()));
       }
     } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java
 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java
index bb12063..c8016da 100644
--- 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java
+++ 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java
@@ -24,7 +24,8 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
@@ -75,8 +76,9 @@ public class RandomKeyValGenerator implements InputOperator
         verport.emit(new KeyHashValPair<Integer, Integer>(e.getKey(), 
e.getValue().size()));
       }
     }
-    if(clearHistory)
+    if (clearHistory) {
       valhistory.clear();
+    }
   }
 
   @Override
@@ -130,7 +132,7 @@ public class RandomKeyValGenerator implements InputOperator
 
   /**
    * Sets the number of possible keys to numKeys
-   * 
+   *
    * @param numKeys
    *          the new number of possible keys
    */
@@ -141,7 +143,7 @@ public class RandomKeyValGenerator implements InputOperator
 
   /**
    * Returns the number of possible values that can be emitted
-   * 
+   *
    * @return the number of possible values that can be emitted
    */
   public int getNumVals()
@@ -151,7 +153,7 @@ public class RandomKeyValGenerator implements InputOperator
 
   /**
    * Sets the number of possible values that can be emitted to numVals
-   * 
+   *
    * @param numVals
    *          the number of possible values that can be emitted
    */
@@ -162,7 +164,7 @@ public class RandomKeyValGenerator implements InputOperator
 
   /**
    * Sets the number of KeyValPairs to be emitted to tupleBlast
-   * 
+   *
    * @param tupleBlast
    *          the new number of KeyValPairs to be emitted
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
index 7c91f77..3f14e0f 100644
--- 
a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
+++ 
b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
@@ -29,19 +29,18 @@ import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.validation.constraints.Min;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.lib.algo.UniqueValueCount;
-import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput;
-import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.algo.UniqueValueCount;
+import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput;
+import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -106,8 +105,7 @@ public abstract class UniqueValueCountAppender<V> extends 
JDBCLookupCacheBackedO
           deleteStatement.executeUpdate();
         }
       }
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
@@ -118,7 +116,7 @@ public abstract class UniqueValueCountAppender<V> extends 
JDBCLookupCacheBackedO
 
     Object key = getKeyFromTuple(tuple);
     @SuppressWarnings("unchecked")
-    Set<Object> values = (Set<Object>) cacheManager.get(key);
+    Set<Object> values = (Set<Object>)cacheManager.get(key);
     if (values == null) {
       values = Sets.newHashSet();
     }
@@ -154,8 +152,7 @@ public abstract class UniqueValueCountAppender<V> extends 
JDBCLookupCacheBackedO
         putStatement.executeBatch();
         putStatement.clearBatch();
       }
-    }
-    catch (SQLException e) {
+    } catch (SQLException e) {
       throw new RuntimeException("while executing insert", e);
     }
   }
@@ -210,8 +207,7 @@ public abstract class UniqueValueCountAppender<V> extends 
JDBCLookupCacheBackedO
         UniqueValueCountAppender<V> statefulUniqueCount = 
this.getClass().newInstance();
         DefaultPartition<UniqueValueCountAppender<V>> partition = new 
DefaultPartition<UniqueValueCountAppender<V>>(statefulUniqueCount);
         newPartitions.add(partition);
-      }
-      catch (Throwable cause) {
+      } catch (Throwable cause) {
         DTThrowable.rethrow(cause);
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java
 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java
index 3ebfcb1..ef5473f 100644
--- 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java
+++ 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.demos.distributeddistinct;
 import org.junit.Test;
 
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.demos.distributeddistinct.Application;
 
 public class ApplicationTest
 {
@@ -30,4 +29,4 @@ public class ApplicationTest
   {
     LocalMode.runApp(new Application(), 15000);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
index d32047a..e013217 100644
--- 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
+++ 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java
@@ -48,8 +48,8 @@ public class DistributedDistinctTest
 {
   private static final Logger logger = 
LoggerFactory.getLogger(DistributedDistinctTest.class);
 
-  private final static String APP_ID = "DistributedDistinctTest";
-  private final static int OPERATOR_ID = 0;
+  private static final String APP_ID = "DistributedDistinctTest";
+  private static final int OPERATOR_ID = 0;
 
   public static final String INMEM_DB_URL = 
"jdbc:hsqldb:mem:test;sql.syntax_mys=true";
   public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java
 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java
index ee7c1f1..57ac964 100644
--- 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java
+++ 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java
@@ -24,18 +24,19 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.LocalMode;
-import com.datatorrent.demos.distributeddistinct.StatefulApplication;
 
 public class StatefulApplicationTest
 {
-  
+
   @BeforeClass
-  public static void setup(){
+  public static void setup()
+  {
     try {
       Class.forName(StatefulUniqueCountTest.INMEM_DB_DRIVER).newInstance();
       Connection con = 
DriverManager.getConnection(StatefulUniqueCountTest.INMEM_DB_URL, new 
Properties());
@@ -51,27 +52,27 @@ public class StatefulApplicationTest
       throw new RuntimeException(e);
     }
   }
-  
+
   @Test
   public void testApplication() throws Exception
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-    conf.set("dt.operator.StatefulUniqueCounter.prop.tableName", 
"Test_Lookup_Cache"); 
+    conf.set("dt.operator.StatefulUniqueCounter.prop.tableName", 
"Test_Lookup_Cache");
     conf.set("dt.operator.StatefulUniqueCounter.prop.store.dbUrl", 
"jdbc:hsqldb:mem:test;sql.syntax_mys=true");
     conf.set("dt.operator.StatefulUniqueCounter.prop.store.dbDriver", 
"org.hsqldb.jdbcDriver");
-      
+
     lma.prepareDAG(new StatefulApplication(), conf);
     lma.cloneDAG();
     LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(false);
     lc.runAsync();
-    
+
     long now = System.currentTimeMillis();
     while (System.currentTimeMillis() - now < 15000) {
       Thread.sleep(1000);
     }
-    
+
     lc.shutdown();
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
----------------------------------------------------------------------
diff --git 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
index 55f1c8e..a1ac603 100644
--- 
a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
+++ 
b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java
@@ -18,20 +18,29 @@
  */
 package com.datatorrent.demos.distributeddistinct;
 
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Properties;
 
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.datatorrent.api.*;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.common.util.BaseOperator;
-import 
com.datatorrent.demos.distributeddistinct.IntegerUniqueValueCountAppender;
 
 import com.datatorrent.lib.algo.UniqueValueCount;
 import com.datatorrent.lib.util.KeyValPair;
@@ -96,7 +105,8 @@ public class StatefulUniqueCountTest
     private static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver";
     protected static final String TABLE_NAME = "Test_Lookup_Cache";
 
-    public final transient DefaultInputPort<Object> input = new 
DefaultInputPort<Object>() {
+    public final transient DefaultInputPort<Object> input = new 
DefaultInputPort<Object>()
+    {
       @Override
       public void process(Object tuple)
       {
@@ -196,7 +206,8 @@ public class StatefulUniqueCountTest
   }
 
   @BeforeClass
-  public static void setup(){
+  public static void setup()
+  {
     try {
       Class.forName(INMEM_DB_DRIVER).newInstance();
       Connection con = DriverManager.getConnection(INMEM_DB_URL, new 
Properties());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java
----------------------------------------------------------------------
diff --git 
a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java
 
b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java
index 74a8f99..90a3fd2 100644
--- 
a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java
+++ 
b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java
@@ -27,7 +27,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
 /**
  * @since 2.1.0
  */
-@ApplicationAnnotation(name="EchoServer")
+@ApplicationAnnotation(name = "EchoServer")
 public class Application implements StreamingApplication
 {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java
----------------------------------------------------------------------
diff --git 
a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java
 
b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java
index 7b8b423..ce7a1bc 100644
--- 
a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java
+++ 
b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java
@@ -23,9 +23,9 @@ import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.DatagramChannel;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * @since 2.1.0
@@ -40,7 +40,7 @@ public class MessageResponder extends BaseOperator
   private transient NetworkManager.ChannelAction<DatagramChannel> action;
   private transient ByteBuffer buffer;
 
-  public transient final DefaultInputPort<Message> messageInput = new 
DefaultInputPort<Message>()
+  public final transient DefaultInputPort<Message> messageInput = new 
DefaultInputPort<Message>()
   {
     @Override
     public void process(Message message)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java
----------------------------------------------------------------------
diff --git 
a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java
 
b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java
index a89e09d..056068f 100644
--- 
a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java
+++ 
b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java
@@ -23,8 +23,15 @@ import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.nio.channels.*;
-import java.util.*;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.slf4j.Logger;
@@ -37,7 +44,11 @@ public class NetworkManager implements Runnable
 {
   private static final Logger logger = 
LoggerFactory.getLogger(NetworkManager.class);
 
-  public static enum ConnectionType { TCP, UDP };
+  public static enum ConnectionType
+  {
+    TCP,
+    UDP
+  }
 
   private static NetworkManager _instance;
   private Selector selector;
@@ -180,36 +191,48 @@ public class NetworkManager implements Runnable
     }
   }
 
-  public static interface ChannelListener<T extends SelectableChannel> {
+  public static interface ChannelListener<T extends SelectableChannel>
+  {
     public void ready(ChannelAction<T> action, int readyOps);
   }
 
-  public static class ChannelConfiguration<T extends SelectableChannel> {
+  public static class ChannelConfiguration<T extends SelectableChannel>
+  {
     public T channel;
     public ConnectionInfo connectionInfo;
     public Collection<ChannelAction> actions;
   }
 
-  public static class ChannelAction<T extends SelectableChannel> {
+  public static class ChannelAction<T extends SelectableChannel>
+  {
     public ChannelConfiguration<T> channelConfiguration;
     public ChannelListener<T> listener;
     public int ops;
   }
 
-  private static class ConnectionInfo {
+  private static class ConnectionInfo
+  {
     public SocketAddress address;
     public ConnectionType connectionType;
 
     @Override
     public boolean equals(Object o)
     {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
 
-      ConnectionInfo that = (ConnectionInfo) o;
+      ConnectionInfo that = (ConnectionInfo)o;
 
-      if (connectionType != that.connectionType) return false;
-      if (!address.equals(that.address)) return false;
+      if (connectionType != that.connectionType) {
+        return false;
+      }
+      if (!address.equals(that.address)) {
+        return false;
+      }
 
       return true;
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java
 
b/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java
index a25e47f..8c52a9d 100644
--- 
a/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java
+++ 
b/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java
@@ -32,10 +32,12 @@ import com.datatorrent.api.LocalMode;
 /**
  * Test the DAG declaration in local mode.
  */
-public class ApplicationTest {
+public class ApplicationTest
+{
 
   @Test
-  public void testApplication() throws IOException, Exception {
+  public void testApplication() throws IOException, Exception
+  {
     try {
       LocalMode lma = LocalMode.newInstance();
       Configuration conf = new Configuration(false);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java
index c28697e..8d7c325 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java
@@ -26,7 +26,6 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.demos.frauddetect.operator.HdfsStringOutputOperator;
 import com.datatorrent.demos.frauddetect.operator.MongoDBOutputOperator;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -36,6 +35,7 @@ import com.datatorrent.lib.math.RangeKeyVal;
 import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
 import com.datatorrent.lib.util.BaseKeyValueOperator;
 import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
 
 
 /**
@@ -43,11 +43,10 @@ import com.datatorrent.lib.util.KeyValPair;
  *
  * @since 0.9.0
  */
-@ApplicationAnnotation(name="FraudDetectDemo")
+@ApplicationAnnotation(name = "FraudDetectDemo")
 public class Application implements StreamingApplication
 {
 
-
   public PubSubWebSocketInputOperator getPubSubWebSocketInputOperator(String 
name, DAG dag, URI duri, String topic) throws Exception
   {
     PubSubWebSocketInputOperator reqin = dag.addOperator(name, new 
PubSubWebSocketInputOperator());
@@ -78,7 +77,8 @@ public class Application implements StreamingApplication
     return oper;
   }
 
-  public static class KeyPartitionCodec<K, V> extends 
BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable {
+  public static class KeyPartitionCodec<K, V> extends 
BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable
+  {
     private static final long serialVersionUID = 201410031623L;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java
index 5fb665c..b813a40 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java
@@ -18,23 +18,25 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.demos.frauddetect.util.JsonUtils;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.validation.constraints.NotNull;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.validation.constraints.NotNull;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.demos.frauddetect.util.JsonUtils;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * Generate an alert if the current transaction amount received on tx input 
port for the given key is greater by n %
@@ -45,8 +47,8 @@ import java.util.Map;
 public class AverageAlertingOperator extends BaseOperator
 {
   private static final Logger Log = 
LoggerFactory.getLogger(AverageAlertingOperator.class);
-  private transient final JsonFactory jsonFactory = new JsonFactory();
-  private transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
   private Map<MerchantKey, MutableDouble> lastSMAMap = new 
HashMap<MerchantKey, MutableDouble>();
   private Map<MerchantKey, MutableDouble> currentSMAMap = new 
HashMap<MerchantKey, MutableDouble>();
   private List<AverageAlertData> alerts = new ArrayList<AverageAlertData>();
@@ -57,7 +59,7 @@ public class AverageAlertingOperator extends BaseOperator
   public final transient DefaultOutputPort<String> avgAlertOutputPort = new 
DefaultOutputPort<String>();
   public final transient DefaultOutputPort<Map<String, Object>> 
avgAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>();
   public final transient DefaultInputPort<KeyValPair<MerchantKey, Double>> 
smaInputPort =
-          new DefaultInputPort<KeyValPair<MerchantKey, Double>>()
+      new DefaultInputPort<KeyValPair<MerchantKey, Double>>()
   {
     @Override
     public void process(KeyValPair<MerchantKey, Double> tuple)
@@ -67,8 +69,7 @@ public class AverageAlertingOperator extends BaseOperator
         double sma = tuple.getValue();
         currentSMAMap.put(tuple.getKey(), new MutableDouble(sma));
         //lastSMAMap.put(tuple.getKey(), new MutableDouble(sma));
-      }
-      else { // move the current SMA value to the last SMA Map
+      } else { // move the current SMA value to the last SMA Map
         //lastSMAMap.get(tuple.getKey()).setValue(currentSma.getValue());
         currentSma.setValue(tuple.getValue());  // update the current SMA value
       }
@@ -76,7 +77,7 @@ public class AverageAlertingOperator extends BaseOperator
 
   };
   public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> 
txInputPort =
-          new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
+      new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
   {
     @Override
     public void process(KeyValPair<MerchantKey, Long> tuple)
@@ -100,8 +101,7 @@ public class AverageAlertingOperator extends BaseOperator
         //if (userGenerated) {   // if its user generated only the pass it to 
WebSocket
         if (merchantKey.merchantType == 
MerchantTransaction.MerchantType.BRICK_AND_MORTAR) {
           avgAlertNotificationPort.emit(getOutputData(data, 
String.format(brickMortarAlertMsg, txValue, change, lastSmaValue, 
merchantKey.merchantId, merchantKey.terminalId)));
-        }
-        else { // its internet based
+        } else { // its internet based
           avgAlertNotificationPort.emit(getOutputData(data, 
String.format(internetAlertMsg, txValue, change, lastSmaValue, 
merchantKey.merchantId)));
 
         }
@@ -116,8 +116,7 @@ public class AverageAlertingOperator extends BaseOperator
     for (AverageAlertData data : alerts) {
       try {
         avgAlertOutputPort.emit(JsonUtils.toJson(data));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         logger.warn("Exception while converting object to JSON", e);
       }
     }
@@ -131,8 +130,7 @@ public class AverageAlertingOperator extends BaseOperator
       if (lastSma == null) {
         lastSma = new MutableDouble(currentSma.doubleValue());
         lastSMAMap.put(key, lastSma);
-      }
-      else {
+      } else {
         lastSma.setValue(currentSma.getValue());
       }
     }
@@ -167,8 +165,7 @@ public class AverageAlertingOperator extends BaseOperator
     try {
       String str = mapper.writeValueAsString(output);
       logger.debug("user generated tx alert: " + str);
-    }
-    catch (Exception exc) {
+    } catch (Exception exc) {
       //ignore
     }
     return output;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java
index df0aa7e..87cf043 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.lib.util.TimeBucketKey;
-
 import java.io.Serializable;
 
+import com.datatorrent.lib.util.TimeBucketKey;
+
 /**
  * Bank Id Number Key
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java
index 642c702..abfa202 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java
@@ -18,23 +18,26 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.demos.frauddetect.util.JsonUtils;
-import org.apache.commons.lang.mutable.MutableLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.demos.frauddetect.util.JsonUtils;
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * Count the transactions for the underlying aggregation window if the same 
BIN is
  * being used for more than defined number of transactions. Output the data as 
needed
@@ -44,19 +47,19 @@ import org.codehaus.jackson.map.ObjectMapper;
  */
 public class BankIdNumberSamplerOperator extends BaseOperator
 {
-  private transient final JsonFactory jsonFactory = new JsonFactory();
-  private transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
   private int threshold;
   private Map<MerchantKey, Map<String, BankIdNumData>> bankIdNumCountMap = new 
HashMap<MerchantKey, Map<String, BankIdNumData>>();
   private static final String ALERT_MSG =
-          "Potential fraudulent CC transactions (same bank id %s and merchant 
%s) total transactions: %d";
+      "Potential fraudulent CC transactions (same bank id %s and merchant %s) 
total transactions: %d";
   /**
    * Output the key-value pair for the BIN as key with the count as value.
    */
   public final transient DefaultOutputPort<String> countAlertOutputPort =
-          new DefaultOutputPort<String>();
+      new DefaultOutputPort<String>();
   public final transient DefaultOutputPort<Map<String, Object>> 
countAlertNotificationPort =
-          new DefaultOutputPort<Map<String, Object>>();
+      new DefaultOutputPort<Map<String, Object>>();
 
   public int getThreshold()
   {
@@ -103,7 +106,7 @@ public class BankIdNumberSamplerOperator extends 
BaseOperator
   */
 
   public final transient DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, 
String>, Integer>> txCountInputPort =
-          new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, 
Integer>>()
+      new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, 
Integer>>()
   {
     @Override
     public void process(KeyValPair<KeyValPair<MerchantKey, String>, Integer> 
tuple)
@@ -162,8 +165,7 @@ public class BankIdNumberSamplerOperator extends 
BaseOperator
           try {
             countAlertOutputPort.emit(JsonUtils.toJson(data));
             countAlertNotificationPort.emit(getOutputData(data));
-          }
-          catch (IOException e) {
+          } catch (IOException e) {
             logger.warn("Exception while converting object to JSON: ", e);
           }
         }
@@ -196,15 +198,14 @@ public class BankIdNumberSamplerOperator extends 
BaseOperator
     try {
       String str = mapper.writeValueAsString(output);
       logger.debug("user generated tx alert: " + str);
-    }
-    catch (Exception exc) {
+    } catch (Exception exc) {
       //ignore
     }
 
     return output;
   }
 
-  public final static class BankIdNumData
+  public static final class BankIdNumData
   {
     public String bankIdNum;
     public MutableLong count = new MutableLong();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java
index d728306..a232fd4 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java
@@ -18,19 +18,26 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.demos.frauddetect.util.JsonUtils;
-import org.apache.commons.lang.mutable.MutableLong;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.map.ObjectMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
+import org.apache.commons.lang.mutable.MutableLong;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.demos.frauddetect.util.JsonUtils;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  * An operator to alert in case a transaction of a small lowAmount is followed 
by a transaction which is significantly larger for a given credit card number.
@@ -42,8 +49,8 @@ import java.util.*;
  */
 public class CreditCardAmountSamplerOperator extends BaseOperator
 {
-  private transient final JsonFactory jsonFactory = new JsonFactory();
-  private transient final ObjectMapper mapper = new ObjectMapper(jsonFactory);
+  private final transient JsonFactory jsonFactory = new JsonFactory();
+  private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory);
   private static final Logger logger = 
LoggerFactory.getLogger(Application.class);
   // Factor to be applied to existing lowAmount to flag potential alerts.
   private double threshold = 9500;
@@ -52,7 +59,7 @@ public class CreditCardAmountSamplerOperator extends 
BaseOperator
   private List<CreditCardAlertData> alerts = new 
ArrayList<CreditCardAlertData>();
   //private List<CreditCardAlertData> userAlerts = new 
ArrayList<CreditCardAlertData>();
   private static final String ALERT_MSG =
-          "Potential fraudulent CC transactions (small one USD %d followed by 
large USD %d) performed using credit card: %s";
+      "Potential fraudulent CC transactions (small one USD %d followed by 
large USD %d) performed using credit card: %s";
   public final transient DefaultOutputPort<String> ccAlertOutputPort = new 
DefaultOutputPort<String>();
   /*
    public final transient DefaultOutputPort<Map<String, Object>> 
ccUserAlertOutputPort = new DefaultOutputPort<Map<String, Object>>();
@@ -82,8 +89,7 @@ public class CreditCardAmountSamplerOperator extends 
BaseOperator
       if (ccAmount < currentSmallValue) {
         cardInfo.lowAmount.setValue(ccAmount);
         cardInfo.time = key.time;
-      }
-      else if (ccAmount > (currentSmallValue + threshold)) {
+      } else if (ccAmount > (currentSmallValue + threshold)) {
         // If the transaction lowAmount is > 70% of the min. lowAmount, send 
an alert.
 
         CreditCardAlertData data = new CreditCardAlertData();
@@ -113,8 +119,7 @@ public class CreditCardAmountSamplerOperator extends 
BaseOperator
         // alert not resetting the low value from a user generated transaction
         //txMap.remove(fullCcNum);
       }
-    }
-    else {
+    } else {
       cardInfo = new CreditCardInfo();
       cardInfo.lowAmount.setValue(ccAmount);
       cardInfo.time = key.time;
@@ -123,7 +128,7 @@ public class CreditCardAmountSamplerOperator extends 
BaseOperator
   }
 
   public transient DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>> 
inputPort =
-          new DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>>()
+      new DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>>()
   {
     //
     // This function checks if a CC entry exists.
@@ -147,8 +152,7 @@ public class CreditCardAmountSamplerOperator extends 
BaseOperator
     for (CreditCardAlertData data : alerts) {
       try {
         ccAlertOutputPort.emit(JsonUtils.toJson(data));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         logger.warn("Exception while converting object to JSON", e);
       }
     }
@@ -192,8 +196,7 @@ public class CreditCardAmountSamplerOperator extends 
BaseOperator
     try {
       String str = mapper.writeValueAsString(output);
       logger.debug("Alert generated: " + str + " userGenerated: " + 
data.userGenerated);
-    }
-    catch (Exception exc) {
+    } catch (Exception exc) {
       //ignore
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java
index 7347790..75e279c 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java
@@ -30,12 +30,12 @@ public class MerchantTransaction implements Serializable
   public enum MerchantType
   {
     UNDEFINED, BRICK_AND_MORTAR, INTERNET
-  };
+  }
 
   public enum TransactionType
   {
     UNDEFINED, POS
-  };
+  }
 
   public String ccNum;
   public String bankIdNum;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java
index 67ab2e9..415b7be 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,19 +18,21 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.KeyValPair;
 import java.text.DecimalFormat;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.commons.lang.mutable.MutableDouble;
 import org.apache.commons.lang.mutable.MutableLong;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * A bucket-like operator to accept merchant transaction object and dissipate 
the
  * transaction amount to the further downstream operator for calculating min, 
max and std-deviation.
@@ -45,13 +47,13 @@ public class MerchantTransactionBucketOperator extends 
BaseOperator
           new DefaultOutputPort<KeyValPair<MerchantKey, String>>();
   */
   public final transient DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, 
String>, Integer>> binCountOutputPort =
-          new DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, 
Integer>>();
+      new DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, 
Integer>>();
   public final transient DefaultOutputPort<KeyValPair<MerchantKey, Long>> 
txOutputPort =
-          new DefaultOutputPort<KeyValPair<MerchantKey, Long>>();
+      new DefaultOutputPort<KeyValPair<MerchantKey, Long>>();
   public final transient DefaultOutputPort<KeyValPair<MerchantKey, 
CreditCardData>> ccAlertOutputPort =
-          new DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>>();
+      new DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>>();
   public final transient DefaultOutputPort<Map<String, Object>> 
summaryTxnOutputPort =
-          new DefaultOutputPort<Map<String, Object>>();
+      new DefaultOutputPort<Map<String, Object>>();
   private MutableLong totalTxns = new MutableLong(0);
   private MutableLong txnsInLastSecond = new MutableLong(0);
   private MutableDouble amtInLastSecond = new MutableDouble(0);
@@ -75,26 +77,26 @@ public class MerchantTransactionBucketOperator extends 
BaseOperator
 
   };
 
-  public void endWindow() {
-      Map<String, Object> summary = new HashMap<String, Object>();
-      double avg;
-      if (txnsInLastSecond.longValue() == 0) {
-          avg = 0;
-      } else {
-          avg = amtInLastSecond.doubleValue() / txnsInLastSecond.longValue();
-      }
-      summary.put("totalTxns", totalTxns);
-      summary.put("txnsInLastSecond", txnsInLastSecond);
-      summary.put("amtInLastSecond", amtFormatter.format(amtInLastSecond));
-      summary.put("avgAmtInLastSecond", amtFormatter.format(avg));
-      summaryTxnOutputPort.emit(summary);
-      txnsInLastSecond.setValue(0);
-      amtInLastSecond.setValue(0);
+  public void endWindow()
+  {
+    Map<String, Object> summary = new HashMap<String, Object>();
+    double avg;
+    if (txnsInLastSecond.longValue() == 0) {
+      avg = 0;
+    } else {
+      avg = amtInLastSecond.doubleValue() / txnsInLastSecond.longValue();
+    }
+    summary.put("totalTxns", totalTxns);
+    summary.put("txnsInLastSecond", txnsInLastSecond);
+    summary.put("amtInLastSecond", amtFormatter.format(amtInLastSecond));
+    summary.put("avgAmtInLastSecond", amtFormatter.format(avg));
+    summaryTxnOutputPort.emit(summary);
+    txnsInLastSecond.setValue(0);
+    amtInLastSecond.setValue(0);
   }
 
   private void processTuple(MerchantTransaction tuple)
   {
-    //emitBankIdNumTuple(tuple, binOutputPort);
     emitBankIdNumTuple(tuple, binCountOutputPort);
     emitMerchantKeyTuple(tuple, txOutputPort);
     emitCreditCardKeyTuple(tuple, ccAlertOutputPort);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java
index 5f9b7ee..49b61aa 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java
@@ -19,12 +19,16 @@
 package com.datatorrent.demos.frauddetect;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
 
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.demos.frauddetect.util.JsonUtils;
@@ -37,8 +41,8 @@ import com.datatorrent.demos.frauddetect.util.JsonUtils;
 public class MerchantTransactionGenerator extends BaseOperator implements 
InputOperator
 {
   private final Random randomNum = new Random();
-  public static final int zipCodes[] = {94086, 94087, 94088, 94089, 94090, 
94091, 94092, 94093};
-  public static final String merchantIds[] = {"Wal-Mart", "Target", "Amazon", 
"Apple", "Sears", "Macys", "JCPenny", "Levis"};
+  public static final int[] zipCodes = {94086, 94087, 94088, 94089, 94090, 
94091, 94092, 94093};
+  public static final String[] merchantIds = {"Wal-Mart", "Target", "Amazon", 
"Apple", "Sears", "Macys", "JCPenny", "Levis"};
 //    public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 
2222", "3333 3333 3333", "4444 4444 4444", "5555 5555 5555", "6666 6666 6666", 
"7777 7777 7777", "8888 8888 8888"};
 //    public static final String ccNums[] = { "0001", "0002", "0003", "0004", 
"0005", "0006", "0007", "0008"};
 //    public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 
2222", "3333 3333 3333", "4444 4444 4444"};
@@ -67,9 +71,9 @@ public class MerchantTransactionGenerator extends 
BaseOperator implements InputO
   }
 
   public transient DefaultOutputPort<MerchantTransaction> txOutputPort =
-          new DefaultOutputPort<MerchantTransaction>();
+      new DefaultOutputPort<MerchantTransaction>();
   public transient DefaultOutputPort<String> txDataOutputPort =
-          new DefaultOutputPort<String>();
+      new DefaultOutputPort<String>();
 
   @Override
   public void emitTuples()
@@ -127,8 +131,7 @@ public class MerchantTransactionGenerator extends 
BaseOperator implements InputO
     for (MerchantTransaction txData : txList) {
       try {
         txDataOutputPort.emit(JsonUtils.toJson(txData));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         logger.warn("Exception while converting object to JSON", e);
       }
     }
@@ -136,8 +139,7 @@ public class MerchantTransactionGenerator extends 
BaseOperator implements InputO
 
     try {
       Thread.sleep(100);
-    }
-    catch (InterruptedException e) {
+    } catch (InterruptedException e) {
       e.printStackTrace();
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java
index 0af8836..cdc829d 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Common utility class that can be used by all other operators to handle user 
input
@@ -41,7 +42,7 @@ public class MerchantTransactionInputHandler extends 
BaseOperator
   public static final String KEY_ZIP_CODE = "zipCode";
   public static final String KEY_AMOUNT = "amount";
   public transient DefaultOutputPort<MerchantTransaction> txOutputPort =
-          new DefaultOutputPort<MerchantTransaction>();
+      new DefaultOutputPort<MerchantTransaction>();
   public transient DefaultInputPort<Map<String, String>> userTxInputPort = new 
DefaultInputPort<Map<String, String>>()
   {
     @Override
@@ -49,8 +50,7 @@ public class MerchantTransactionInputHandler extends 
BaseOperator
     {
       try {
         txOutputPort.emit(processInput(tuple));
-      }
-      catch (Exception exc) {
+      } catch (Exception exc) {
         logger.error("Exception while handling the input", exc);
       }
     }
@@ -86,8 +86,7 @@ public class MerchantTransactionInputHandler extends 
BaseOperator
       }
     }
 
-    if (bankIdNum == null || ccNum == null || merchantId == null
-            || terminalId == null || zipCode == null || amount == null) {
+    if (bankIdNum == null || ccNum == null || merchantId == null || terminalId 
== null || zipCode == null || amount == null) {
       throw new IllegalArgumentException("Missing required input!");
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java
index 056967b..2701c14 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java
@@ -20,11 +20,12 @@ package com.datatorrent.demos.frauddetect;
 
 import java.util.ArrayList;
 
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+
 import com.datatorrent.lib.multiwindow.AbstractSlidingWindowKeyVal;
 import com.datatorrent.lib.util.KeyValPair;
 
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 
 /**
  * Sliding window sum operator
@@ -34,27 +35,27 @@ import 
com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 public class SlidingWindowSumKeyVal<K, V extends Number> extends 
AbstractSlidingWindowKeyVal<K, V, SlidingWindowSumObject>
 {
 
-       /**
-        * Output port to emit simple moving average (SMA) of last N window as 
Double.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Double>> 
doubleSum = new DefaultOutputPort<KeyValPair<K, Double>>();
-       /**
-        * Output port to emit simple moving average (SMA) of last N window as 
Float.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSum 
= new DefaultOutputPort<KeyValPair<K, Float>>();
-       /**
-        * Output port to emit simple moving average (SMA) of last N window as 
Long.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Long>> longSum = 
new DefaultOutputPort<KeyValPair<K, Long>>();
-       /**
-        * Output port to emit simple moving average (SMA) of last N window as
-        * Integer.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Integer>> 
integerSum = new DefaultOutputPort<KeyValPair<K, Integer>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as 
Double.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSum = 
new DefaultOutputPort<KeyValPair<K, Double>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Float.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSum = 
new DefaultOutputPort<KeyValPair<K, Float>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as Long.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Long>> longSum = new 
DefaultOutputPort<KeyValPair<K, Long>>();
+  /**
+   * Output port to emit simple moving average (SMA) of last N window as
+   * Integer.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSum 
= new DefaultOutputPort<KeyValPair<K, Integer>>();
 
 
   @Override
@@ -69,7 +70,6 @@ public class SlidingWindowSumKeyVal<K, V extends Number> 
extends AbstractSliding
       }
       buffer.put(key, stateList);
     }
-
     SlidingWindowSumObject state = stateList.get(currentstate);
     state.add(tuple.getValue());
   }
@@ -78,7 +78,7 @@ public class SlidingWindowSumKeyVal<K, V extends Number> 
extends AbstractSliding
   public void emitTuple(K key, ArrayList<SlidingWindowSumObject> obj)
   {
     double sum = 0;
-    for (int i=0; i < obj.size(); ++i) {
+    for (int i = 0; i < obj.size(); ++i) {
       SlidingWindowSumObject state = obj.get(i);
       sum += state.getSum();
     }
@@ -86,13 +86,13 @@ public class SlidingWindowSumKeyVal<K, V extends Number> 
extends AbstractSliding
       doubleSum.emit(new KeyValPair<K, Double>(key, sum));
     }
     if (floatSum.isConnected()) {
-      floatSum.emit(new KeyValPair<K, Float>(key, (float) sum));
+      floatSum.emit(new KeyValPair<K, Float>(key, (float)sum));
     }
     if (longSum.isConnected()) {
-      longSum.emit(new KeyValPair<K, Long>(key, (long) sum));
+      longSum.emit(new KeyValPair<K, Long>(key, (long)sum));
     }
     if (integerSum.isConnected()) {
-      integerSum.emit(new KeyValPair<K, Integer>(key, (int) sum));
+      integerSum.emit(new KeyValPair<K, Integer>(key, (int)sum));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java
index 2075165..3fefb66 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java
@@ -33,12 +33,14 @@ public class SlidingWindowSumObject extends 
SimpleMovingAverageObject
 
   MutableDouble sum = new MutableDouble(0);
 
-  public void add(Number n) {
+  public void add(Number n)
+  {
     sum.add(n);
   }
 
   @Override
-  public double getSum() {
+  public double getSum()
+  {
     return sum.doubleValue();
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java
index 4a90618..e226af0 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java
@@ -18,18 +18,19 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.common.util.BaseOperator;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.demos.frauddetect.util.JsonUtils;
 import com.datatorrent.lib.util.HighLow;
 import com.datatorrent.lib.util.KeyValPair;
-import com.datatorrent.demos.frauddetect.util.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Operator to aggregate the min, max, sma, std-dev and variance for the given 
key.
@@ -39,10 +40,10 @@ import java.util.Map;
 public class TransactionStatsAggregator extends BaseOperator
 {
   public Map<MerchantKey, TransactionStatsData> aggrgateMap =
-          new HashMap<MerchantKey, TransactionStatsData>();
+      new HashMap<MerchantKey, TransactionStatsData>();
   public final transient DefaultOutputPort<String> txDataOutputPort = new 
DefaultOutputPort<String>();
   public final transient DefaultInputPort<KeyValPair<MerchantKey, 
HighLow<Long>>> rangeInputPort =
-          new DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>>()
+      new DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>>()
   {
     @Override
     public void process(KeyValPair<MerchantKey, HighLow<Long>> tuple)
@@ -55,7 +56,7 @@ public class TransactionStatsAggregator extends BaseOperator
 
   };
   public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> 
smaInputPort =
-          new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
+      new DefaultInputPort<KeyValPair<MerchantKey, Long>>()
   {
     @Override
     public void process(KeyValPair<MerchantKey, Long> tuple)
@@ -87,8 +88,7 @@ public class TransactionStatsAggregator extends BaseOperator
     for (Map.Entry<MerchantKey, TransactionStatsData> entry : 
aggrgateMap.entrySet()) {
       try {
         txDataOutputPort.emit(JsonUtils.toJson(entry.getValue()));
-      }
-      catch (IOException e) {
+      } catch (IOException e) {
         logger.warn("Exception while converting object to JSON", e);
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java
index 86fa921..4b8f851 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java
@@ -18,12 +18,12 @@
  */
 package com.datatorrent.demos.frauddetect.operator;
 
+import java.io.File;
+
 import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 
-import java.io.File;
-
 /**
  * Adapter for writing Strings to HDFS
  * <p>
@@ -65,8 +65,7 @@ public class HdfsStringOutputOperator extends 
AbstractFileOutputOperator<String>
   }
 
   @Override
-  public String getPartFileName(String fileName,
-                                int part)
+  public String getPartFileName(String fileName, int part)
   {
     return fileName + part;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java
index a4a2775..0171c00 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java
@@ -18,146 +18,171 @@
  */
 package com.datatorrent.demos.frauddetect.operator;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.Context;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.validation.constraints.NotNull;
 
-import com.datatorrent.api.DefaultInputPort;
-import com.mongodb.*;
-import com.mongodb.util.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteResult;
+import com.mongodb.util.JSON;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 
 /**
  * Operator to write data into MongoDB
  *
  * @since 0.9.0
  */
-public class MongoDBOutputOperator extends BaseOperator {
-
-    @NotNull
-    protected String hostName;
-    @NotNull
-    protected String dataBase;
-    @NotNull
-    protected String collection;
-
-    protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
-
-    protected String userName;
-    protected String passWord;
-
-    protected transient MongoClient mongoClient;
-    protected transient DB db;
-    protected transient DBCollection dbCollection;
-
-    protected List<DBObject> dataList = new ArrayList<DBObject>();
-
-    public MongoDBOutputOperator() {
-    }
-
-    /**
-     * Take the JSON formatted string and convert it to DBObject
-     */
-    public transient final DefaultInputPort<String> inputPort = new 
DefaultInputPort<String>() {
-        @Override
-        public void process(String tuple) {
-            dataList.add((DBObject)JSON.parse(tuple));
-        }
-    };
-
-    @Override
-    public void setup(Context.OperatorContext context) {
-        super.setup(context);
-        try {
-            mongoClient = new MongoClient(hostName);
-            db = mongoClient.getDB(dataBase);
-            if (userName != null && passWord != null) {
-                if (!db.authenticate(userName, passWord.toCharArray())) {
-                    throw new IllegalArgumentException("MongoDB authentication 
failed. Illegal username and password for MongoDB!!");
-                }
-            }
-            dbCollection = db.getCollection(collection);
-        }
-        catch (UnknownHostException ex) {
-            logger.debug(ex.toString());
-        }
-    }
-
-    @Override
-    public void beginWindow(long windowId) {
-        // nothing
-    }
-
-    @Override
-    public void endWindow() {
-        logger.debug("mongo datalist size: " + dataList.size());
-        if (dataList.size() > 0) {
-            WriteResult result = dbCollection.insert(dataList, writeConcern);
-            logger.debug("Result for MongoDB insert: " + result);
-            dataList.clear();
-        }
-    }
-
+public class MongoDBOutputOperator extends BaseOperator
+{
+  @NotNull
+  protected String hostName;
+  @NotNull
+  protected String dataBase;
+  @NotNull
+  protected String collection;
+
+  protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED;
+
+  protected String userName;
+  protected String passWord;
+
+  protected transient MongoClient mongoClient;
+  protected transient DB db;
+  protected transient DBCollection dbCollection;
+
+  protected List<DBObject> dataList = new ArrayList<DBObject>();
+
+  public MongoDBOutputOperator()
+  {
+  }
+
+  /**
+   * Take the JSON formatted string and convert it to DBObject
+   */
+  public final transient DefaultInputPort<String> inputPort = new 
DefaultInputPort<String>()
+  {
     @Override
-    public void teardown() {
-        if (mongoClient != null) {
-            mongoClient.close();
+    public void process(String tuple)
+    {
+      dataList.add((DBObject)JSON.parse(tuple));
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+    try {
+      mongoClient = new MongoClient(hostName);
+      db = mongoClient.getDB(dataBase);
+      if (userName != null && passWord != null) {
+        if (!db.authenticate(userName, passWord.toCharArray())) {
+          throw new IllegalArgumentException("MongoDB authentication failed. 
Illegal username and password for MongoDB!!");
         }
-    }
-
-    public String getHostName() {
-        return hostName;
-    }
-
-    public void setHostName(String hostName) {
-        this.hostName = hostName;
-    }
-
-    public String getDataBase() {
-        return dataBase;
-    }
-
-    public void setDataBase(String dataBase) {
-        this.dataBase = dataBase;
-    }
-
-    public String getCollection() {
-        return collection;
-    }
-
-    public void setCollection(String collection) {
-        this.collection = collection;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-
-    public String getPassWord() {
-        return passWord;
-    }
-
-    public void setPassWord(String passWord) {
-        this.passWord = passWord;
-    }
-
-    public WriteConcern getWriteConcern() {
-        return writeConcern;
-    }
-
-    public void setWriteConcern(WriteConcern writeConcern) {
-        this.writeConcern = writeConcern;
-    }
-
-    private static final Logger logger = 
LoggerFactory.getLogger(MongoDBOutputOperator.class);
+      }
+      dbCollection = db.getCollection(collection);
+    } catch (UnknownHostException ex) {
+      logger.debug(ex.toString());
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    // nothing
+  }
+
+  @Override
+  public void endWindow()
+  {
+    logger.debug("mongo datalist size: " + dataList.size());
+    if (dataList.size() > 0) {
+      WriteResult result = dbCollection.insert(dataList, writeConcern);
+      logger.debug("Result for MongoDB insert: " + result);
+      dataList.clear();
+    }
+  }
+
+  @Override
+  public void teardown()
+  {
+    if (mongoClient != null) {
+      mongoClient.close();
+    }
+  }
+
+  public String getHostName()
+  {
+    return hostName;
+  }
+
+  public void setHostName(String hostName)
+  {
+    this.hostName = hostName;
+  }
+
+  public String getDataBase()
+  {
+    return dataBase;
+  }
+
+  public void setDataBase(String dataBase)
+  {
+    this.dataBase = dataBase;
+  }
+
+  public String getCollection()
+  {
+    return collection;
+  }
+
+  public void setCollection(String collection)
+  {
+    this.collection = collection;
+  }
+
+  public String getUserName()
+  {
+    return userName;
+  }
+
+  public void setUserName(String userName)
+  {
+    this.userName = userName;
+  }
+
+  public String getPassWord()
+  {
+    return passWord;
+  }
+
+  public void setPassWord(String passWord)
+  {
+    this.passWord = passWord;
+  }
+
+  public WriteConcern getWriteConcern()
+  {
+    return writeConcern;
+  }
+
+  public void setWriteConcern(WriteConcern writeConcern)
+  {
+    this.writeConcern = writeConcern;
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MongoDBOutputOperator.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java
 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java
index 1eb87ed..60c200f 100644
--- 
a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java
+++ 
b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java
@@ -18,20 +18,21 @@
  */
 package com.datatorrent.demos.frauddetect.util;
 
-import org.codehaus.jackson.map.ObjectMapper;
-
 import java.io.IOException;
 
+import org.codehaus.jackson.map.ObjectMapper;
+
 /**
  * Utility class to deal with JSON and Object
  *
  * @since 0.9.0
  */
-public class JsonUtils {
-
-    private static final ObjectMapper mapper = new ObjectMapper();
+public class JsonUtils
+{
+  private static final ObjectMapper mapper = new ObjectMapper();
 
-    public static String toJson(Object obj) throws IOException {
-        return mapper.writeValueAsString(obj);
-    }
+  public static String toJson(Object obj) throws IOException
+  {
+    return mapper.writeValueAsString(obj);
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java
 
b/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java
index b54ee83..ef1f371 100644
--- 
a/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java
+++ 
b/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java
@@ -18,33 +18,33 @@
  */
 package com.datatorrent.demos.frauddetect;
 
-import com.datatorrent.api.LocalMode;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
 
 /**
  * Fraud detection application test
  */
-public class FrauddetectApplicationTest {
+public class FrauddetectApplicationTest
+{
 
-    public FrauddetectApplicationTest() {
-    }
+  public FrauddetectApplicationTest()
+  {
+  }
 
-    @Test
-    public void testApplication() throws Exception {
-      try
-      {
-        Application application = new Application();
-        Configuration conf = new Configuration(false);
-        conf.addResource("dt-site-frauddetect.xml");
-        LocalMode lma = LocalMode.newInstance();
-        lma.prepareDAG(application, conf);
-        lma.getController().run(120000);
-      }
-      catch(Exception e)
-      {
-        e.printStackTrace();
-      }
+  @Test
+  public void testApplication() throws Exception
+  {
+    try {
+      Application application = new Application();
+      Configuration conf = new Configuration(false);
+      conf.addResource("dt-site-frauddetect.xml");
+      LocalMode lma = LocalMode.newInstance();
+      lma.prepareDAG(application, conf);
+      lma.getController().run(120000);
+    } catch (Exception e) {
+      e.printStackTrace();
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
----------------------------------------------------------------------
diff --git 
a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
 
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
index 671cc72..21afc5b 100644
--- 
a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
+++ 
b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java
@@ -49,19 +49,19 @@ public class MinimalWordCount implements 
StreamingApplication
   {
     static Map<String, Long> result;
     private static boolean done = false;
-  
+
     public static boolean isDone()
     {
       return done;
     }
-  
+
     @Override
     public void setup(Context.OperatorContext context)
     {
       done = false;
       result = new HashMap<>();
     }
-    
+
     public final transient DefaultInputPort<KeyValPair<String, Long>> input = 
new DefaultInputPort<KeyValPair<String, Long>>()
     {
       @Override
@@ -74,7 +74,7 @@ public class MinimalWordCount implements StreamingApplication
       }
     };
   }
-  
+
   /**
    * Populate the dag using High-Level API.
    * @param dag
@@ -93,7 +93,7 @@ public class MinimalWordCount implements StreamingApplication
           public Iterable<String> f(String input)
           {
             return Arrays.asList(input.split("[^a-zA-Z']+"));
-          
+
           }
         }, name("ExtractWords"))
         // Apply windowing to the stream for counting, in this case, the 
window option is global window.

Reply via email to