Fixed checkstyle errors for demos.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7d9386d2 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7d9386d2 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7d9386d2 Branch: refs/heads/master Commit: 7d9386d2abb445954055f6bf8c6d2aa75f896d77 Parents: 846b4a3 Author: MalharJenkins <[email protected]> Authored: Fri Aug 26 13:33:43 2016 -0700 Committer: Shunxin <[email protected]> Committed: Fri Aug 26 14:07:31 2016 -0700 ---------------------------------------------------------------------- .../distributeddistinct/CountVerifier.java | 10 +- .../IntegerUniqueValueCountAppender.java | 17 +- .../RandomKeyValGenerator.java | 14 +- .../UniqueValueCountAppender.java | 22 +- .../distributeddistinct/ApplicationTest.java | 3 +- .../DistributedDistinctTest.java | 4 +- .../StatefulApplicationTest.java | 19 +- .../StatefulUniqueCountTest.java | 23 +- .../demos/echoserver/Application.java | 2 +- .../demos/echoserver/MessageResponder.java | 4 +- .../demos/echoserver/NetworkManager.java | 47 ++- .../demos/echoserver/ApplicationTest.java | 6 +- .../demos/frauddetect/Application.java | 8 +- .../frauddetect/AverageAlertingOperator.java | 49 ++- .../demos/frauddetect/BankIdNumberKey.java | 4 +- .../BankIdNumberSamplerOperator.java | 41 +-- .../CreditCardAmountSamplerOperator.java | 43 +-- .../demos/frauddetect/MerchantTransaction.java | 4 +- .../MerchantTransactionBucketOperator.java | 60 ++-- .../MerchantTransactionGenerator.java | 22 +- .../MerchantTransactionInputHandler.java | 17 +- .../frauddetect/SlidingWindowSumKeyVal.java | 56 ++-- .../frauddetect/SlidingWindowSumObject.java | 6 +- .../frauddetect/TransactionStatsAggregator.java | 26 +- .../operator/HdfsStringOutputOperator.java | 7 +- .../operator/MongoDBOutputOperator.java | 281 +++++++++-------- .../demos/frauddetect/util/JsonUtils.java | 17 +- .../frauddetect/FrauddetectApplicationTest.java | 40 +-- .../malhar/stream/sample/MinimalWordCount.java | 10 +- .../malhar/stream/sample/WindowedWordCount.java | 66 ++-- .../stream/sample/complete/AutoComplete.java | 17 +- .../sample/complete/CompletionCandidate.java | 2 +- .../stream/sample/complete/PojoEvent.java | 6 +- .../sample/complete/StreamingWordExtract.java | 36 +-- .../sample/complete/TopWikipediaSessions.java | 70 ++--- .../stream/sample/complete/TrafficRoutes.java | 132 ++++---- .../sample/complete/TwitterAutoComplete.java | 3 +- .../sample/cookbook/CombinePerKeyExamples.java | 40 +-- .../stream/sample/cookbook/DeDupExample.java | 24 +- .../stream/sample/cookbook/InputPojo.java | 18 +- .../sample/cookbook/MaxPerKeyExamples.java | 42 +-- .../stream/sample/cookbook/OutputPojo.java | 10 +- .../stream/sample/cookbook/TriggerExample.java | 4 +- .../stream/sample/MinimalWordCountTest.java | 8 +- .../stream/sample/WindowedWordCountTest.java | 10 +- .../sample/complete/AutoCompleteTest.java | 8 +- .../complete/StreamingWordExtractTest.java | 26 +- .../complete/TopWikipediaSessionsTest.java | 8 +- .../sample/complete/TrafficRoutesTest.java | 8 +- .../cookbook/CombinePerKeyExamplesTest.java | 8 +- .../sample/cookbook/DeDupExampleTest.java | 8 +- .../sample/cookbook/MaxPerKeyExamplesTest.java | 60 ++-- .../demos/iteration/Application.java | 23 +- .../demos/iteration/ApplicationTest.java | 14 +- .../demos/machinedata/Application.java | 15 +- .../demos/machinedata/DimensionGenerator.java | 18 +- .../demos/machinedata/InputReceiver.java | 20 +- .../demos/machinedata/data/MachineKey.java | 35 ++- .../demos/machinedata/data/ResourceType.java | 54 ++-- .../operator/CalculatorOperator.java | 25 +- .../operator/MachineInfoAveragingOperator.java | 26 +- ...chineInfoAveragingPrerequisitesOperator.java | 18 +- .../operator/MachineInfoAveragingUnifier.java | 5 +- .../demos/machinedata/util/Combinatorics.java | 119 +++---- .../demos/machinedata/util/DataTable.java | 60 ++-- .../machinedata/CalculatorOperatorTest.java | 34 +- .../datatorrent/demos/mobile/Application.java | 22 +- .../demos/mobile/PhoneEntryOperator.java | 23 +- .../demos/mobile/PhoneMovementGenerator.java | 56 ++-- .../demos/mobile/ApplicationTest.java | 8 +- .../demos/mrmonitor/Application.java | 7 +- .../datatorrent/demos/mrmonitor/Constants.java | 28 +- .../demos/mrmonitor/MRJobStatusOperator.java | 50 ++- .../mrmonitor/MRMonitoringApplication.java | 3 +- .../demos/mrmonitor/MRStatusObject.java | 22 +- .../com/datatorrent/demos/mrmonitor/MRUtil.java | 7 +- .../demos/mrmonitor/MapToMRObjectOperator.java | 3 +- .../mrmonitor/MrMonitoringApplicationTest.java | 5 +- .../demos/mroperator/DateWritable.java | 76 ++--- .../mroperator/HdfsKeyValOutputOperator.java | 2 +- .../mroperator/InvertedIndexApplication.java | 2 +- .../demos/mroperator/LineIndexer.java | 35 ++- .../demos/mroperator/LogCountsPerHour.java | 313 +++++++++---------- .../demos/mroperator/LogsCountApplication.java | 2 +- .../demos/mroperator/MapOperator.java | 83 ++--- .../mroperator/NewWordCountApplication.java | 20 +- .../demos/mroperator/OutputCollectorImpl.java | 87 +++--- .../demos/mroperator/ReduceOperator.java | 266 ++++++++-------- .../demos/mroperator/ReporterImpl.java | 165 +++++----- .../datatorrent/demos/mroperator/WordCount.java | 23 +- .../demos/mroperator/MapOperatorTest.java | 31 +- .../demos/mroperator/ReduceOperatorTest.java | 70 +++-- .../mroperator/WordCountMRApplicationTest.java | 7 +- .../com/datatorrent/demos/pi/Application.java | 7 +- .../demos/pi/ApplicationAppData.java | 2 +- .../demos/pi/ApplicationWithScript.java | 13 +- .../com/datatorrent/demos/pi/Calculator.java | 4 +- .../datatorrent/demos/pi/NamedValueList.java | 16 +- .../demos/pi/PiCalculateOperator.java | 5 +- .../datatorrent/demos/pi/ApplicationTest.java | 18 +- .../datatorrent/demos/pi/CalculatorTest.java | 5 +- .../demos/r/oldfaithful/FaithfulRScript.java | 16 +- .../demos/r/oldfaithful/InputGenerator.java | 5 +- .../r/oldfaithful/OldFaithfulApplication.java | 5 +- .../oldfaithful/OldFaithfulApplicationTest.java | 2 +- .../twitter/KinesisHashtagsApplication.java | 8 +- .../demos/twitter/SlidingContainer.java | 2 +- .../demos/twitter/TwitterDumpApplication.java | 6 +- .../twitter/TwitterDumpHBaseApplication.java | 8 +- .../twitter/TwitterStatusHashtagExtractor.java | 8 +- .../twitter/TwitterStatusURLExtractor.java | 7 +- .../twitter/TwitterStatusWordExtractor.java | 16 +- .../twitter/TwitterTopCounterApplication.java | 22 +- .../twitter/TwitterTopWordsApplication.java | 6 +- .../TwitterTrendingHashtagsApplication.java | 7 +- .../com/datatorrent/demos/twitter/URLSerDe.java | 10 +- .../demos/twitter/WindowedTopCounter.java | 29 +- .../twitter/TwitterDumpApplicationTest.java | 3 +- .../demos/twitter/TwitterTopCounterTest.java | 5 +- .../demos/twitter/TwitterTopWordsTest.java | 11 +- .../demos/uniquecount/Application.java | 6 +- .../demos/uniquecount/CountVerifier.java | 48 ++- .../demos/uniquecount/RandomDataGenerator.java | 18 +- .../demos/uniquecount/RandomKeyValues.java | 222 ++++++------- .../demos/uniquecount/RandomKeysGenerator.java | 18 +- .../uniquecount/UniqueKeyValCountDemo.java | 7 +- .../demos/uniquecount/ApplicationTest.java | 4 +- .../demos/uniquecount/UniqueKeyValDemoTest.java | 4 +- .../demos/wordcount/Application.java | 19 +- .../wordcount/ApplicationWithQuerySupport.java | 33 +- .../demos/wordcount/FileWordCount.java | 45 +-- .../datatorrent/demos/wordcount/LineReader.java | 6 +- .../com/datatorrent/demos/wordcount/WCPair.java | 17 +- .../demos/wordcount/WindowWordCount.java | 4 +- .../demos/wordcount/WordCountInputOperator.java | 128 ++++---- .../demos/wordcount/WordCountWriter.java | 10 +- .../datatorrent/demos/wordcount/WordReader.java | 14 +- .../demos/wordcount/ApplicationTest.java | 14 +- .../yahoofinance/ApplicationWithDerbySQL.java | 15 +- .../demos/yahoofinance/StockTickInput.java | 50 +-- .../yahoofinance/YahooFinanceApplication.java | 9 +- .../YahooFinanceCSVInputOperator.java | 28 +- .../demos/yahoofinance/ApplicationTest.java | 5 +- .../ApplicationWithDerbySQLTest.java | 12 +- 144 files changed, 2238 insertions(+), 2102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java index d085744..417ed7c 100644 --- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java +++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/CountVerifier.java @@ -39,7 +39,8 @@ public class CountVerifier implements Operator Map<Integer, Integer> trueCount = new HashMap<Integer, Integer>(); Map<Integer, Integer> receivedCount = new HashMap<Integer, Integer>(); - public transient final DefaultInputPort<KeyValPair<Integer, Integer>> trueIn = new DefaultInputPort<KeyValPair<Integer, Integer>>() { + public final transient DefaultInputPort<KeyValPair<Integer, Integer>> trueIn = new DefaultInputPort<KeyValPair<Integer, Integer>>() + { @Override public void process(KeyValPair<Integer, Integer> tuple) { @@ -47,7 +48,8 @@ public class CountVerifier implements Operator } }; - public transient final DefaultInputPort<KeyValPair<Integer, Integer>> recIn = new DefaultInputPort<KeyValPair<Integer, Integer>>() { + public final transient DefaultInputPort<KeyValPair<Integer, Integer>> recIn = new DefaultInputPort<KeyValPair<Integer, Integer>>() + { @Override public void process(KeyValPair<Integer, Integer> tuple) { @@ -56,9 +58,9 @@ public class CountVerifier implements Operator }; @OutputPortFieldAnnotation(optional = true) - public transient final DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>(); + public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>(); @OutputPortFieldAnnotation(optional = true) - public transient final DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>(); + public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>(); @Override public void setup(OperatorContext arg0) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java index 28baf06..bf8a307 100644 --- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java +++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/IntegerUniqueValueCountAppender.java @@ -42,38 +42,39 @@ public class IntegerUniqueValueCountAppender extends UniqueValueCountAppender<In { Set<Integer> valSet = new HashSet<Integer>(); try { - while (resultSet.next()) + while (resultSet.next()) { valSet.add(resultSet.getInt(1)); + } return valSet; } catch (SQLException e) { throw new RuntimeException("while processing the result set", e); } } - + @Override protected void prepareGetStatement(PreparedStatement getStatement, Object key) throws SQLException { - getStatement.setInt(1, (Integer) key); + getStatement.setInt(1, (Integer)key); } @Override protected void preparePutStatement(PreparedStatement putStatement, Object key, Object value) throws SQLException { @SuppressWarnings("unchecked") - Set<Integer> valueSet = (Set<Integer>) value; + Set<Integer> valueSet = (Set<Integer>)value; for (Integer val : valueSet) { @SuppressWarnings("unchecked") - Set<Integer> currentVals = (Set<Integer>) get(key); + Set<Integer> currentVals = (Set<Integer>)get(key); if (!currentVals.contains(val)) { batch = true; - putStatement.setInt(1, (Integer) key); + putStatement.setInt(1, (Integer)key); putStatement.setInt(2, val); putStatement.setLong(3, windowID); putStatement.addBatch(); } } } - + @Override public void endWindow() { @@ -84,7 +85,7 @@ public class IntegerUniqueValueCountAppender extends UniqueValueCountAppender<In while (resultSet.next()) { int val = resultSet.getInt(1); @SuppressWarnings("unchecked") - Set<Integer> valSet = (Set<Integer>) cacheManager.get(val); + Set<Integer> valSet = (Set<Integer>)cacheManager.get(val); output.emit(new KeyValPair<Object, Object>(val, valSet.size())); } } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java index bb12063..c8016da 100644 --- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java +++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/RandomKeyValGenerator.java @@ -24,7 +24,8 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import org.slf4j.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; @@ -75,8 +76,9 @@ public class RandomKeyValGenerator implements InputOperator verport.emit(new KeyHashValPair<Integer, Integer>(e.getKey(), e.getValue().size())); } } - if(clearHistory) + if (clearHistory) { valhistory.clear(); + } } @Override @@ -130,7 +132,7 @@ public class RandomKeyValGenerator implements InputOperator /** * Sets the number of possible keys to numKeys - * + * * @param numKeys * the new number of possible keys */ @@ -141,7 +143,7 @@ public class RandomKeyValGenerator implements InputOperator /** * Returns the number of possible values that can be emitted - * + * * @return the number of possible values that can be emitted */ public int getNumVals() @@ -151,7 +153,7 @@ public class RandomKeyValGenerator implements InputOperator /** * Sets the number of possible values that can be emitted to numVals - * + * * @param numVals * the number of possible values that can be emitted */ @@ -162,7 +164,7 @@ public class RandomKeyValGenerator implements InputOperator /** * Sets the number of KeyValPairs to be emitted to tupleBlast - * + * * @param tupleBlast * the new number of KeyValPairs to be emitted */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java index 7c91f77..3f14e0f 100644 --- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java +++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java @@ -29,19 +29,18 @@ import java.util.Set; import javax.annotation.Nonnull; import javax.validation.constraints.Min; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.lib.algo.UniqueValueCount; -import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput; -import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Partitioner; +import com.datatorrent.lib.algo.UniqueValueCount; +import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput; +import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator; import com.datatorrent.netlet.util.DTThrowable; /** @@ -106,8 +105,7 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO deleteStatement.executeUpdate(); } } - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException(e); } } @@ -118,7 +116,7 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO Object key = getKeyFromTuple(tuple); @SuppressWarnings("unchecked") - Set<Object> values = (Set<Object>) cacheManager.get(key); + Set<Object> values = (Set<Object>)cacheManager.get(key); if (values == null) { values = Sets.newHashSet(); } @@ -154,8 +152,7 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO putStatement.executeBatch(); putStatement.clearBatch(); } - } - catch (SQLException e) { + } catch (SQLException e) { throw new RuntimeException("while executing insert", e); } } @@ -210,8 +207,7 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO UniqueValueCountAppender<V> statefulUniqueCount = this.getClass().newInstance(); DefaultPartition<UniqueValueCountAppender<V>> partition = new DefaultPartition<UniqueValueCountAppender<V>>(statefulUniqueCount); newPartitions.add(partition); - } - catch (Throwable cause) { + } catch (Throwable cause) { DTThrowable.rethrow(cause); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java index 3ebfcb1..ef5473f 100644 --- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java +++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/ApplicationTest.java @@ -21,7 +21,6 @@ package com.datatorrent.demos.distributeddistinct; import org.junit.Test; import com.datatorrent.api.LocalMode; -import com.datatorrent.demos.distributeddistinct.Application; public class ApplicationTest { @@ -30,4 +29,4 @@ public class ApplicationTest { LocalMode.runApp(new Application(), 15000); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java index d32047a..e013217 100644 --- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java +++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java @@ -48,8 +48,8 @@ public class DistributedDistinctTest { private static final Logger logger = LoggerFactory.getLogger(DistributedDistinctTest.class); - private final static String APP_ID = "DistributedDistinctTest"; - private final static int OPERATOR_ID = 0; + private static final String APP_ID = "DistributedDistinctTest"; + private static final int OPERATOR_ID = 0; public static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver"; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java index ee7c1f1..57ac964 100644 --- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java +++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulApplicationTest.java @@ -24,18 +24,19 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Properties; -import org.apache.hadoop.conf.Configuration; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.LocalMode; -import com.datatorrent.demos.distributeddistinct.StatefulApplication; public class StatefulApplicationTest { - + @BeforeClass - public static void setup(){ + public static void setup() + { try { Class.forName(StatefulUniqueCountTest.INMEM_DB_DRIVER).newInstance(); Connection con = DriverManager.getConnection(StatefulUniqueCountTest.INMEM_DB_URL, new Properties()); @@ -51,27 +52,27 @@ public class StatefulApplicationTest throw new RuntimeException(e); } } - + @Test public void testApplication() throws Exception { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); - conf.set("dt.operator.StatefulUniqueCounter.prop.tableName", "Test_Lookup_Cache"); + conf.set("dt.operator.StatefulUniqueCounter.prop.tableName", "Test_Lookup_Cache"); conf.set("dt.operator.StatefulUniqueCounter.prop.store.dbUrl", "jdbc:hsqldb:mem:test;sql.syntax_mys=true"); conf.set("dt.operator.StatefulUniqueCounter.prop.store.dbDriver", "org.hsqldb.jdbcDriver"); - + lma.prepareDAG(new StatefulApplication(), conf); lma.cloneDAG(); LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(false); lc.runAsync(); - + long now = System.currentTimeMillis(); while (System.currentTimeMillis() - now < 15000) { Thread.sleep(1000); } - + lc.shutdown(); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java index 55f1c8e..a1ac603 100644 --- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java +++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java @@ -18,20 +18,29 @@ */ package com.datatorrent.demos.distributeddistinct; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.Properties; -import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import com.datatorrent.api.*; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.demos.distributeddistinct.IntegerUniqueValueCountAppender; import com.datatorrent.lib.algo.UniqueValueCount; import com.datatorrent.lib.util.KeyValPair; @@ -96,7 +105,8 @@ public class StatefulUniqueCountTest private static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver"; protected static final String TABLE_NAME = "Test_Lookup_Cache"; - public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { @Override public void process(Object tuple) { @@ -196,7 +206,8 @@ public class StatefulUniqueCountTest } @BeforeClass - public static void setup(){ + public static void setup() + { try { Class.forName(INMEM_DB_DRIVER).newInstance(); Connection con = DriverManager.getConnection(INMEM_DB_URL, new Properties()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java index 74a8f99..90a3fd2 100644 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java +++ b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java @@ -27,7 +27,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; /** * @since 2.1.0 */ -@ApplicationAnnotation(name="EchoServer") +@ApplicationAnnotation(name = "EchoServer") public class Application implements StreamingApplication { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java index 7b8b423..ce7a1bc 100644 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java +++ b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java @@ -23,9 +23,9 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; /** * @since 2.1.0 @@ -40,7 +40,7 @@ public class MessageResponder extends BaseOperator private transient NetworkManager.ChannelAction<DatagramChannel> action; private transient ByteBuffer buffer; - public transient final DefaultInputPort<Message> messageInput = new DefaultInputPort<Message>() + public final transient DefaultInputPort<Message> messageInput = new DefaultInputPort<Message>() { @Override public void process(Message message) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java index a89e09d..056068f 100644 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java +++ b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java @@ -23,8 +23,15 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; -import java.nio.channels.*; -import java.util.*; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; @@ -37,7 +44,11 @@ public class NetworkManager implements Runnable { private static final Logger logger = LoggerFactory.getLogger(NetworkManager.class); - public static enum ConnectionType { TCP, UDP }; + public static enum ConnectionType + { + TCP, + UDP + } private static NetworkManager _instance; private Selector selector; @@ -180,36 +191,48 @@ public class NetworkManager implements Runnable } } - public static interface ChannelListener<T extends SelectableChannel> { + public static interface ChannelListener<T extends SelectableChannel> + { public void ready(ChannelAction<T> action, int readyOps); } - public static class ChannelConfiguration<T extends SelectableChannel> { + public static class ChannelConfiguration<T extends SelectableChannel> + { public T channel; public ConnectionInfo connectionInfo; public Collection<ChannelAction> actions; } - public static class ChannelAction<T extends SelectableChannel> { + public static class ChannelAction<T extends SelectableChannel> + { public ChannelConfiguration<T> channelConfiguration; public ChannelListener<T> listener; public int ops; } - private static class ConnectionInfo { + private static class ConnectionInfo + { public SocketAddress address; public ConnectionType connectionType; @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } - ConnectionInfo that = (ConnectionInfo) o; + ConnectionInfo that = (ConnectionInfo)o; - if (connectionType != that.connectionType) return false; - if (!address.equals(that.address)) return false; + if (connectionType != that.connectionType) { + return false; + } + if (!address.equals(that.address)) { + return false; + } return true; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java b/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java index a25e47f..8c52a9d 100644 --- a/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java +++ b/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java @@ -32,10 +32,12 @@ import com.datatorrent.api.LocalMode; /** * Test the DAG declaration in local mode. */ -public class ApplicationTest { +public class ApplicationTest +{ @Test - public void testApplication() throws IOException, Exception { + public void testApplication() throws IOException, Exception + { try { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java index c28697e..8d7c325 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java @@ -26,7 +26,6 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.demos.frauddetect.operator.HdfsStringOutputOperator; import com.datatorrent.demos.frauddetect.operator.MongoDBOutputOperator; import com.datatorrent.lib.io.ConsoleOutputOperator; @@ -36,6 +35,7 @@ import com.datatorrent.lib.math.RangeKeyVal; import com.datatorrent.lib.multiwindow.SimpleMovingAverage; import com.datatorrent.lib.util.BaseKeyValueOperator; import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.netlet.util.DTThrowable; /** @@ -43,11 +43,10 @@ import com.datatorrent.lib.util.KeyValPair; * * @since 0.9.0 */ -@ApplicationAnnotation(name="FraudDetectDemo") +@ApplicationAnnotation(name = "FraudDetectDemo") public class Application implements StreamingApplication { - public PubSubWebSocketInputOperator getPubSubWebSocketInputOperator(String name, DAG dag, URI duri, String topic) throws Exception { PubSubWebSocketInputOperator reqin = dag.addOperator(name, new PubSubWebSocketInputOperator()); @@ -78,7 +77,8 @@ public class Application implements StreamingApplication return oper; } - public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable { + public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable + { private static final long serialVersionUID = 201410031623L; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java index 5fb665c..b813a40 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java @@ -18,23 +18,25 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.demos.frauddetect.util.JsonUtils; -import org.apache.commons.lang.mutable.MutableDouble; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.map.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.validation.constraints.NotNull; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.validation.constraints.NotNull; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableDouble; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.demos.frauddetect.util.JsonUtils; +import com.datatorrent.lib.util.KeyValPair; /** * Generate an alert if the current transaction amount received on tx input port for the given key is greater by n % @@ -45,8 +47,8 @@ import java.util.Map; public class AverageAlertingOperator extends BaseOperator { private static final Logger Log = LoggerFactory.getLogger(AverageAlertingOperator.class); - private transient final JsonFactory jsonFactory = new JsonFactory(); - private transient final ObjectMapper mapper = new ObjectMapper(jsonFactory); + private final transient JsonFactory jsonFactory = new JsonFactory(); + private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); private Map<MerchantKey, MutableDouble> lastSMAMap = new HashMap<MerchantKey, MutableDouble>(); private Map<MerchantKey, MutableDouble> currentSMAMap = new HashMap<MerchantKey, MutableDouble>(); private List<AverageAlertData> alerts = new ArrayList<AverageAlertData>(); @@ -57,7 +59,7 @@ public class AverageAlertingOperator extends BaseOperator public final transient DefaultOutputPort<String> avgAlertOutputPort = new DefaultOutputPort<String>(); public final transient DefaultOutputPort<Map<String, Object>> avgAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>(); public final transient DefaultInputPort<KeyValPair<MerchantKey, Double>> smaInputPort = - new DefaultInputPort<KeyValPair<MerchantKey, Double>>() + new DefaultInputPort<KeyValPair<MerchantKey, Double>>() { @Override public void process(KeyValPair<MerchantKey, Double> tuple) @@ -67,8 +69,7 @@ public class AverageAlertingOperator extends BaseOperator double sma = tuple.getValue(); currentSMAMap.put(tuple.getKey(), new MutableDouble(sma)); //lastSMAMap.put(tuple.getKey(), new MutableDouble(sma)); - } - else { // move the current SMA value to the last SMA Map + } else { // move the current SMA value to the last SMA Map //lastSMAMap.get(tuple.getKey()).setValue(currentSma.getValue()); currentSma.setValue(tuple.getValue()); // update the current SMA value } @@ -76,7 +77,7 @@ public class AverageAlertingOperator extends BaseOperator }; public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> txInputPort = - new DefaultInputPort<KeyValPair<MerchantKey, Long>>() + new DefaultInputPort<KeyValPair<MerchantKey, Long>>() { @Override public void process(KeyValPair<MerchantKey, Long> tuple) @@ -100,8 +101,7 @@ public class AverageAlertingOperator extends BaseOperator //if (userGenerated) { // if its user generated only the pass it to WebSocket if (merchantKey.merchantType == MerchantTransaction.MerchantType.BRICK_AND_MORTAR) { avgAlertNotificationPort.emit(getOutputData(data, String.format(brickMortarAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId, merchantKey.terminalId))); - } - else { // its internet based + } else { // its internet based avgAlertNotificationPort.emit(getOutputData(data, String.format(internetAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId))); } @@ -116,8 +116,7 @@ public class AverageAlertingOperator extends BaseOperator for (AverageAlertData data : alerts) { try { avgAlertOutputPort.emit(JsonUtils.toJson(data)); - } - catch (IOException e) { + } catch (IOException e) { logger.warn("Exception while converting object to JSON", e); } } @@ -131,8 +130,7 @@ public class AverageAlertingOperator extends BaseOperator if (lastSma == null) { lastSma = new MutableDouble(currentSma.doubleValue()); lastSMAMap.put(key, lastSma); - } - else { + } else { lastSma.setValue(currentSma.getValue()); } } @@ -167,8 +165,7 @@ public class AverageAlertingOperator extends BaseOperator try { String str = mapper.writeValueAsString(output); logger.debug("user generated tx alert: " + str); - } - catch (Exception exc) { + } catch (Exception exc) { //ignore } return output; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java index df0aa7e..87cf043 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java @@ -18,10 +18,10 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.lib.util.TimeBucketKey; - import java.io.Serializable; +import com.datatorrent.lib.util.TimeBucketKey; + /** * Bank Id Number Key * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java index 642c702..abfa202 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java @@ -18,23 +18,26 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.demos.frauddetect.util.JsonUtils; -import org.apache.commons.lang.mutable.MutableLong; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableLong; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.demos.frauddetect.util.JsonUtils; +import com.datatorrent.lib.util.KeyValPair; + /** * Count the transactions for the underlying aggregation window if the same BIN is * being used for more than defined number of transactions. Output the data as needed @@ -44,19 +47,19 @@ import org.codehaus.jackson.map.ObjectMapper; */ public class BankIdNumberSamplerOperator extends BaseOperator { - private transient final JsonFactory jsonFactory = new JsonFactory(); - private transient final ObjectMapper mapper = new ObjectMapper(jsonFactory); + private final transient JsonFactory jsonFactory = new JsonFactory(); + private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); private int threshold; private Map<MerchantKey, Map<String, BankIdNumData>> bankIdNumCountMap = new HashMap<MerchantKey, Map<String, BankIdNumData>>(); private static final String ALERT_MSG = - "Potential fraudulent CC transactions (same bank id %s and merchant %s) total transactions: %d"; + "Potential fraudulent CC transactions (same bank id %s and merchant %s) total transactions: %d"; /** * Output the key-value pair for the BIN as key with the count as value. */ public final transient DefaultOutputPort<String> countAlertOutputPort = - new DefaultOutputPort<String>(); + new DefaultOutputPort<String>(); public final transient DefaultOutputPort<Map<String, Object>> countAlertNotificationPort = - new DefaultOutputPort<Map<String, Object>>(); + new DefaultOutputPort<Map<String, Object>>(); public int getThreshold() { @@ -103,7 +106,7 @@ public class BankIdNumberSamplerOperator extends BaseOperator */ public final transient DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> txCountInputPort = - new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>() + new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>() { @Override public void process(KeyValPair<KeyValPair<MerchantKey, String>, Integer> tuple) @@ -162,8 +165,7 @@ public class BankIdNumberSamplerOperator extends BaseOperator try { countAlertOutputPort.emit(JsonUtils.toJson(data)); countAlertNotificationPort.emit(getOutputData(data)); - } - catch (IOException e) { + } catch (IOException e) { logger.warn("Exception while converting object to JSON: ", e); } } @@ -196,15 +198,14 @@ public class BankIdNumberSamplerOperator extends BaseOperator try { String str = mapper.writeValueAsString(output); logger.debug("user generated tx alert: " + str); - } - catch (Exception exc) { + } catch (Exception exc) { //ignore } return output; } - public final static class BankIdNumData + public static final class BankIdNumData { public String bankIdNum; public MutableLong count = new MutableLong(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java index d728306..a232fd4 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAmountSamplerOperator.java @@ -18,19 +18,26 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.demos.frauddetect.util.JsonUtils; -import org.apache.commons.lang.mutable.MutableLong; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.map.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; +import org.apache.commons.lang.mutable.MutableLong; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.demos.frauddetect.util.JsonUtils; +import com.datatorrent.lib.util.KeyValPair; /** * An operator to alert in case a transaction of a small lowAmount is followed by a transaction which is significantly larger for a given credit card number. @@ -42,8 +49,8 @@ import java.util.*; */ public class CreditCardAmountSamplerOperator extends BaseOperator { - private transient final JsonFactory jsonFactory = new JsonFactory(); - private transient final ObjectMapper mapper = new ObjectMapper(jsonFactory); + private final transient JsonFactory jsonFactory = new JsonFactory(); + private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); private static final Logger logger = LoggerFactory.getLogger(Application.class); // Factor to be applied to existing lowAmount to flag potential alerts. private double threshold = 9500; @@ -52,7 +59,7 @@ public class CreditCardAmountSamplerOperator extends BaseOperator private List<CreditCardAlertData> alerts = new ArrayList<CreditCardAlertData>(); //private List<CreditCardAlertData> userAlerts = new ArrayList<CreditCardAlertData>(); private static final String ALERT_MSG = - "Potential fraudulent CC transactions (small one USD %d followed by large USD %d) performed using credit card: %s"; + "Potential fraudulent CC transactions (small one USD %d followed by large USD %d) performed using credit card: %s"; public final transient DefaultOutputPort<String> ccAlertOutputPort = new DefaultOutputPort<String>(); /* public final transient DefaultOutputPort<Map<String, Object>> ccUserAlertOutputPort = new DefaultOutputPort<Map<String, Object>>(); @@ -82,8 +89,7 @@ public class CreditCardAmountSamplerOperator extends BaseOperator if (ccAmount < currentSmallValue) { cardInfo.lowAmount.setValue(ccAmount); cardInfo.time = key.time; - } - else if (ccAmount > (currentSmallValue + threshold)) { + } else if (ccAmount > (currentSmallValue + threshold)) { // If the transaction lowAmount is > 70% of the min. lowAmount, send an alert. CreditCardAlertData data = new CreditCardAlertData(); @@ -113,8 +119,7 @@ public class CreditCardAmountSamplerOperator extends BaseOperator // alert not resetting the low value from a user generated transaction //txMap.remove(fullCcNum); } - } - else { + } else { cardInfo = new CreditCardInfo(); cardInfo.lowAmount.setValue(ccAmount); cardInfo.time = key.time; @@ -123,7 +128,7 @@ public class CreditCardAmountSamplerOperator extends BaseOperator } public transient DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>> inputPort = - new DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>>() + new DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>>() { // // This function checks if a CC entry exists. @@ -147,8 +152,7 @@ public class CreditCardAmountSamplerOperator extends BaseOperator for (CreditCardAlertData data : alerts) { try { ccAlertOutputPort.emit(JsonUtils.toJson(data)); - } - catch (IOException e) { + } catch (IOException e) { logger.warn("Exception while converting object to JSON", e); } } @@ -192,8 +196,7 @@ public class CreditCardAmountSamplerOperator extends BaseOperator try { String str = mapper.writeValueAsString(output); logger.debug("Alert generated: " + str + " userGenerated: " + data.userGenerated); - } - catch (Exception exc) { + } catch (Exception exc) { //ignore } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java index 7347790..75e279c 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransaction.java @@ -30,12 +30,12 @@ public class MerchantTransaction implements Serializable public enum MerchantType { UNDEFINED, BRICK_AND_MORTAR, INTERNET - }; + } public enum TransactionType { UNDEFINED, POS - }; + } public String ccNum; public String bankIdNum; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java index 67ab2e9..415b7be 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionBucketOperator.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,19 +18,21 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.KeyValPair; import java.text.DecimalFormat; +import java.util.HashMap; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.lang.mutable.MutableDouble; import org.apache.commons.lang.mutable.MutableLong; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + /** * A bucket-like operator to accept merchant transaction object and dissipate the * transaction amount to the further downstream operator for calculating min, max and std-deviation. @@ -45,13 +47,13 @@ public class MerchantTransactionBucketOperator extends BaseOperator new DefaultOutputPort<KeyValPair<MerchantKey, String>>(); */ public final transient DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> binCountOutputPort = - new DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>(); + new DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>(); public final transient DefaultOutputPort<KeyValPair<MerchantKey, Long>> txOutputPort = - new DefaultOutputPort<KeyValPair<MerchantKey, Long>>(); + new DefaultOutputPort<KeyValPair<MerchantKey, Long>>(); public final transient DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> ccAlertOutputPort = - new DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>>(); + new DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>>(); public final transient DefaultOutputPort<Map<String, Object>> summaryTxnOutputPort = - new DefaultOutputPort<Map<String, Object>>(); + new DefaultOutputPort<Map<String, Object>>(); private MutableLong totalTxns = new MutableLong(0); private MutableLong txnsInLastSecond = new MutableLong(0); private MutableDouble amtInLastSecond = new MutableDouble(0); @@ -75,26 +77,26 @@ public class MerchantTransactionBucketOperator extends BaseOperator }; - public void endWindow() { - Map<String, Object> summary = new HashMap<String, Object>(); - double avg; - if (txnsInLastSecond.longValue() == 0) { - avg = 0; - } else { - avg = amtInLastSecond.doubleValue() / txnsInLastSecond.longValue(); - } - summary.put("totalTxns", totalTxns); - summary.put("txnsInLastSecond", txnsInLastSecond); - summary.put("amtInLastSecond", amtFormatter.format(amtInLastSecond)); - summary.put("avgAmtInLastSecond", amtFormatter.format(avg)); - summaryTxnOutputPort.emit(summary); - txnsInLastSecond.setValue(0); - amtInLastSecond.setValue(0); + public void endWindow() + { + Map<String, Object> summary = new HashMap<String, Object>(); + double avg; + if (txnsInLastSecond.longValue() == 0) { + avg = 0; + } else { + avg = amtInLastSecond.doubleValue() / txnsInLastSecond.longValue(); + } + summary.put("totalTxns", totalTxns); + summary.put("txnsInLastSecond", txnsInLastSecond); + summary.put("amtInLastSecond", amtFormatter.format(amtInLastSecond)); + summary.put("avgAmtInLastSecond", amtFormatter.format(avg)); + summaryTxnOutputPort.emit(summary); + txnsInLastSecond.setValue(0); + amtInLastSecond.setValue(0); } private void processTuple(MerchantTransaction tuple) { - //emitBankIdNumTuple(tuple, binOutputPort); emitBankIdNumTuple(tuple, binCountOutputPort); emitMerchantKeyTuple(tuple, txOutputPort); emitCreditCardKeyTuple(tuple, ccAlertOutputPort); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java index 5f9b7ee..49b61aa 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionGenerator.java @@ -19,12 +19,16 @@ package com.datatorrent.demos.frauddetect; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.*; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.demos.frauddetect.util.JsonUtils; @@ -37,8 +41,8 @@ import com.datatorrent.demos.frauddetect.util.JsonUtils; public class MerchantTransactionGenerator extends BaseOperator implements InputOperator { private final Random randomNum = new Random(); - public static final int zipCodes[] = {94086, 94087, 94088, 94089, 94090, 94091, 94092, 94093}; - public static final String merchantIds[] = {"Wal-Mart", "Target", "Amazon", "Apple", "Sears", "Macys", "JCPenny", "Levis"}; + public static final int[] zipCodes = {94086, 94087, 94088, 94089, 94090, 94091, 94092, 94093}; + public static final String[] merchantIds = {"Wal-Mart", "Target", "Amazon", "Apple", "Sears", "Macys", "JCPenny", "Levis"}; // public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444", "5555 5555 5555", "6666 6666 6666", "7777 7777 7777", "8888 8888 8888"}; // public static final String ccNums[] = { "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008"}; // public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444"}; @@ -67,9 +71,9 @@ public class MerchantTransactionGenerator extends BaseOperator implements InputO } public transient DefaultOutputPort<MerchantTransaction> txOutputPort = - new DefaultOutputPort<MerchantTransaction>(); + new DefaultOutputPort<MerchantTransaction>(); public transient DefaultOutputPort<String> txDataOutputPort = - new DefaultOutputPort<String>(); + new DefaultOutputPort<String>(); @Override public void emitTuples() @@ -127,8 +131,7 @@ public class MerchantTransactionGenerator extends BaseOperator implements InputO for (MerchantTransaction txData : txList) { try { txDataOutputPort.emit(JsonUtils.toJson(txData)); - } - catch (IOException e) { + } catch (IOException e) { logger.warn("Exception while converting object to JSON", e); } } @@ -136,8 +139,7 @@ public class MerchantTransactionGenerator extends BaseOperator implements InputO try { Thread.sleep(100); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java index 0af8836..cdc829d 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/MerchantTransactionInputHandler.java @@ -18,13 +18,14 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; /** * Common utility class that can be used by all other operators to handle user input @@ -41,7 +42,7 @@ public class MerchantTransactionInputHandler extends BaseOperator public static final String KEY_ZIP_CODE = "zipCode"; public static final String KEY_AMOUNT = "amount"; public transient DefaultOutputPort<MerchantTransaction> txOutputPort = - new DefaultOutputPort<MerchantTransaction>(); + new DefaultOutputPort<MerchantTransaction>(); public transient DefaultInputPort<Map<String, String>> userTxInputPort = new DefaultInputPort<Map<String, String>>() { @Override @@ -49,8 +50,7 @@ public class MerchantTransactionInputHandler extends BaseOperator { try { txOutputPort.emit(processInput(tuple)); - } - catch (Exception exc) { + } catch (Exception exc) { logger.error("Exception while handling the input", exc); } } @@ -86,8 +86,7 @@ public class MerchantTransactionInputHandler extends BaseOperator } } - if (bankIdNum == null || ccNum == null || merchantId == null - || terminalId == null || zipCode == null || amount == null) { + if (bankIdNum == null || ccNum == null || merchantId == null || terminalId == null || zipCode == null || amount == null) { throw new IllegalArgumentException("Missing required input!"); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java index 056967b..2701c14 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumKeyVal.java @@ -20,11 +20,12 @@ package com.datatorrent.demos.frauddetect; import java.util.ArrayList; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; + import com.datatorrent.lib.multiwindow.AbstractSlidingWindowKeyVal; import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; /** * Sliding window sum operator @@ -34,27 +35,27 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; public class SlidingWindowSumKeyVal<K, V extends Number> extends AbstractSlidingWindowKeyVal<K, V, SlidingWindowSumObject> { - /** - * Output port to emit simple moving average (SMA) of last N window as Double. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSum = new DefaultOutputPort<KeyValPair<K, Double>>(); - /** - * Output port to emit simple moving average (SMA) of last N window as Float. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSum = new DefaultOutputPort<KeyValPair<K, Float>>(); - /** - * Output port to emit simple moving average (SMA) of last N window as Long. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Long>> longSum = new DefaultOutputPort<KeyValPair<K, Long>>(); - /** - * Output port to emit simple moving average (SMA) of last N window as - * Integer. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSum = new DefaultOutputPort<KeyValPair<K, Integer>>(); + /** + * Output port to emit simple moving average (SMA) of last N window as Double. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSum = new DefaultOutputPort<KeyValPair<K, Double>>(); + /** + * Output port to emit simple moving average (SMA) of last N window as Float. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSum = new DefaultOutputPort<KeyValPair<K, Float>>(); + /** + * Output port to emit simple moving average (SMA) of last N window as Long. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Long>> longSum = new DefaultOutputPort<KeyValPair<K, Long>>(); + /** + * Output port to emit simple moving average (SMA) of last N window as + * Integer. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSum = new DefaultOutputPort<KeyValPair<K, Integer>>(); @Override @@ -69,7 +70,6 @@ public class SlidingWindowSumKeyVal<K, V extends Number> extends AbstractSliding } buffer.put(key, stateList); } - SlidingWindowSumObject state = stateList.get(currentstate); state.add(tuple.getValue()); } @@ -78,7 +78,7 @@ public class SlidingWindowSumKeyVal<K, V extends Number> extends AbstractSliding public void emitTuple(K key, ArrayList<SlidingWindowSumObject> obj) { double sum = 0; - for (int i=0; i < obj.size(); ++i) { + for (int i = 0; i < obj.size(); ++i) { SlidingWindowSumObject state = obj.get(i); sum += state.getSum(); } @@ -86,13 +86,13 @@ public class SlidingWindowSumKeyVal<K, V extends Number> extends AbstractSliding doubleSum.emit(new KeyValPair<K, Double>(key, sum)); } if (floatSum.isConnected()) { - floatSum.emit(new KeyValPair<K, Float>(key, (float) sum)); + floatSum.emit(new KeyValPair<K, Float>(key, (float)sum)); } if (longSum.isConnected()) { - longSum.emit(new KeyValPair<K, Long>(key, (long) sum)); + longSum.emit(new KeyValPair<K, Long>(key, (long)sum)); } if (integerSum.isConnected()) { - integerSum.emit(new KeyValPair<K, Integer>(key, (int) sum)); + integerSum.emit(new KeyValPair<K, Integer>(key, (int)sum)); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java index 2075165..3fefb66 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/SlidingWindowSumObject.java @@ -33,12 +33,14 @@ public class SlidingWindowSumObject extends SimpleMovingAverageObject MutableDouble sum = new MutableDouble(0); - public void add(Number n) { + public void add(Number n) + { sum.add(n); } @Override - public double getSum() { + public double getSum() + { return sum.doubleValue(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java index 4a90618..e226af0 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/TransactionStatsAggregator.java @@ -18,18 +18,19 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.common.util.BaseOperator; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.demos.frauddetect.util.JsonUtils; import com.datatorrent.lib.util.HighLow; import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.demos.frauddetect.util.JsonUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; /** * Operator to aggregate the min, max, sma, std-dev and variance for the given key. @@ -39,10 +40,10 @@ import java.util.Map; public class TransactionStatsAggregator extends BaseOperator { public Map<MerchantKey, TransactionStatsData> aggrgateMap = - new HashMap<MerchantKey, TransactionStatsData>(); + new HashMap<MerchantKey, TransactionStatsData>(); public final transient DefaultOutputPort<String> txDataOutputPort = new DefaultOutputPort<String>(); public final transient DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>> rangeInputPort = - new DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>>() + new DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>>() { @Override public void process(KeyValPair<MerchantKey, HighLow<Long>> tuple) @@ -55,7 +56,7 @@ public class TransactionStatsAggregator extends BaseOperator }; public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> smaInputPort = - new DefaultInputPort<KeyValPair<MerchantKey, Long>>() + new DefaultInputPort<KeyValPair<MerchantKey, Long>>() { @Override public void process(KeyValPair<MerchantKey, Long> tuple) @@ -87,8 +88,7 @@ public class TransactionStatsAggregator extends BaseOperator for (Map.Entry<MerchantKey, TransactionStatsData> entry : aggrgateMap.entrySet()) { try { txDataOutputPort.emit(JsonUtils.toJson(entry.getValue())); - } - catch (IOException e) { + } catch (IOException e) { logger.warn("Exception while converting object to JSON", e); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java index 86fa921..4b8f851 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/HdfsStringOutputOperator.java @@ -18,12 +18,12 @@ */ package com.datatorrent.demos.frauddetect.operator; +import java.io.File; + import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; -import java.io.File; - /** * Adapter for writing Strings to HDFS * <p> @@ -65,8 +65,7 @@ public class HdfsStringOutputOperator extends AbstractFileOutputOperator<String> } @Override - public String getPartFileName(String fileName, - int part) + public String getPartFileName(String fileName, int part) { return fileName + part; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java index a4a2775..0171c00 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/operator/MongoDBOutputOperator.java @@ -18,146 +18,171 @@ */ package com.datatorrent.demos.frauddetect.operator; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.Context; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; import javax.validation.constraints.NotNull; -import com.datatorrent.api.DefaultInputPort; -import com.mongodb.*; -import com.mongodb.util.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.WriteConcern; +import com.mongodb.WriteResult; +import com.mongodb.util.JSON; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; + /** * Operator to write data into MongoDB * * @since 0.9.0 */ -public class MongoDBOutputOperator extends BaseOperator { - - @NotNull - protected String hostName; - @NotNull - protected String dataBase; - @NotNull - protected String collection; - - protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED; - - protected String userName; - protected String passWord; - - protected transient MongoClient mongoClient; - protected transient DB db; - protected transient DBCollection dbCollection; - - protected List<DBObject> dataList = new ArrayList<DBObject>(); - - public MongoDBOutputOperator() { - } - - /** - * Take the JSON formatted string and convert it to DBObject - */ - public transient final DefaultInputPort<String> inputPort = new DefaultInputPort<String>() { - @Override - public void process(String tuple) { - dataList.add((DBObject)JSON.parse(tuple)); - } - }; - - @Override - public void setup(Context.OperatorContext context) { - super.setup(context); - try { - mongoClient = new MongoClient(hostName); - db = mongoClient.getDB(dataBase); - if (userName != null && passWord != null) { - if (!db.authenticate(userName, passWord.toCharArray())) { - throw new IllegalArgumentException("MongoDB authentication failed. Illegal username and password for MongoDB!!"); - } - } - dbCollection = db.getCollection(collection); - } - catch (UnknownHostException ex) { - logger.debug(ex.toString()); - } - } - - @Override - public void beginWindow(long windowId) { - // nothing - } - - @Override - public void endWindow() { - logger.debug("mongo datalist size: " + dataList.size()); - if (dataList.size() > 0) { - WriteResult result = dbCollection.insert(dataList, writeConcern); - logger.debug("Result for MongoDB insert: " + result); - dataList.clear(); - } - } - +public class MongoDBOutputOperator extends BaseOperator +{ + @NotNull + protected String hostName; + @NotNull + protected String dataBase; + @NotNull + protected String collection; + + protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED; + + protected String userName; + protected String passWord; + + protected transient MongoClient mongoClient; + protected transient DB db; + protected transient DBCollection dbCollection; + + protected List<DBObject> dataList = new ArrayList<DBObject>(); + + public MongoDBOutputOperator() + { + } + + /** + * Take the JSON formatted string and convert it to DBObject + */ + public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() + { @Override - public void teardown() { - if (mongoClient != null) { - mongoClient.close(); + public void process(String tuple) + { + dataList.add((DBObject)JSON.parse(tuple)); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + try { + mongoClient = new MongoClient(hostName); + db = mongoClient.getDB(dataBase); + if (userName != null && passWord != null) { + if (!db.authenticate(userName, passWord.toCharArray())) { + throw new IllegalArgumentException("MongoDB authentication failed. Illegal username and password for MongoDB!!"); } - } - - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - public String getDataBase() { - return dataBase; - } - - public void setDataBase(String dataBase) { - this.dataBase = dataBase; - } - - public String getCollection() { - return collection; - } - - public void setCollection(String collection) { - this.collection = collection; - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public String getPassWord() { - return passWord; - } - - public void setPassWord(String passWord) { - this.passWord = passWord; - } - - public WriteConcern getWriteConcern() { - return writeConcern; - } - - public void setWriteConcern(WriteConcern writeConcern) { - this.writeConcern = writeConcern; - } - - private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperator.class); + } + dbCollection = db.getCollection(collection); + } catch (UnknownHostException ex) { + logger.debug(ex.toString()); + } + } + + @Override + public void beginWindow(long windowId) + { + // nothing + } + + @Override + public void endWindow() + { + logger.debug("mongo datalist size: " + dataList.size()); + if (dataList.size() > 0) { + WriteResult result = dbCollection.insert(dataList, writeConcern); + logger.debug("Result for MongoDB insert: " + result); + dataList.clear(); + } + } + + @Override + public void teardown() + { + if (mongoClient != null) { + mongoClient.close(); + } + } + + public String getHostName() + { + return hostName; + } + + public void setHostName(String hostName) + { + this.hostName = hostName; + } + + public String getDataBase() + { + return dataBase; + } + + public void setDataBase(String dataBase) + { + this.dataBase = dataBase; + } + + public String getCollection() + { + return collection; + } + + public void setCollection(String collection) + { + this.collection = collection; + } + + public String getUserName() + { + return userName; + } + + public void setUserName(String userName) + { + this.userName = userName; + } + + public String getPassWord() + { + return passWord; + } + + public void setPassWord(String passWord) + { + this.passWord = passWord; + } + + public WriteConcern getWriteConcern() + { + return writeConcern; + } + + public void setWriteConcern(WriteConcern writeConcern) + { + this.writeConcern = writeConcern; + } + + private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java index 1eb87ed..60c200f 100644 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java +++ b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/util/JsonUtils.java @@ -18,20 +18,21 @@ */ package com.datatorrent.demos.frauddetect.util; -import org.codehaus.jackson.map.ObjectMapper; - import java.io.IOException; +import org.codehaus.jackson.map.ObjectMapper; + /** * Utility class to deal with JSON and Object * * @since 0.9.0 */ -public class JsonUtils { - - private static final ObjectMapper mapper = new ObjectMapper(); +public class JsonUtils +{ + private static final ObjectMapper mapper = new ObjectMapper(); - public static String toJson(Object obj) throws IOException { - return mapper.writeValueAsString(obj); - } + public static String toJson(Object obj) throws IOException + { + return mapper.writeValueAsString(obj); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java b/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java index b54ee83..ef1f371 100644 --- a/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java +++ b/demos/frauddetect/src/test/java/com/datatorrent/demos/frauddetect/FrauddetectApplicationTest.java @@ -18,33 +18,33 @@ */ package com.datatorrent.demos.frauddetect; -import com.datatorrent.api.LocalMode; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; /** * Fraud detection application test */ -public class FrauddetectApplicationTest { +public class FrauddetectApplicationTest +{ - public FrauddetectApplicationTest() { - } + public FrauddetectApplicationTest() + { + } - @Test - public void testApplication() throws Exception { - try - { - Application application = new Application(); - Configuration conf = new Configuration(false); - conf.addResource("dt-site-frauddetect.xml"); - LocalMode lma = LocalMode.newInstance(); - lma.prepareDAG(application, conf); - lma.getController().run(120000); - } - catch(Exception e) - { - e.printStackTrace(); - } + @Test + public void testApplication() throws Exception + { + try { + Application application = new Application(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-frauddetect.xml"); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(application, conf); + lma.getController().run(120000); + } catch (Exception e) { + e.printStackTrace(); } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java index 671cc72..21afc5b 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@ -49,19 +49,19 @@ public class MinimalWordCount implements StreamingApplication { static Map<String, Long> result; private static boolean done = false; - + public static boolean isDone() { return done; } - + @Override public void setup(Context.OperatorContext context) { done = false; result = new HashMap<>(); } - + public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>() { @Override @@ -74,7 +74,7 @@ public class MinimalWordCount implements StreamingApplication } }; } - + /** * Populate the dag using High-Level API. * @param dag @@ -93,7 +93,7 @@ public class MinimalWordCount implements StreamingApplication public Iterable<String> f(String input) { return Arrays.asList(input.split("[^a-zA-Z']+")); - + } }, name("ExtractWords")) // Apply windowing to the stream for counting, in this case, the window option is global window.
