http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java deleted file mode 100644 index a1ac603..0000000 --- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.distributeddistinct; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Properties; - -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.common.util.BaseOperator; - -import com.datatorrent.lib.algo.UniqueValueCount; -import com.datatorrent.lib.util.KeyValPair; - -public class StatefulUniqueCountTest -{ - - public static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; - public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver"; - public static final String TABLE_NAME = "Test_Lookup_Cache"; - - static class KeyGen implements InputOperator - { - - public transient DefaultOutputPort<KeyValPair<Integer, Object>> output = new DefaultOutputPort<KeyValPair<Integer, Object>>(); - - @Override - public void beginWindow(long windowId) - { - } - - public void emitKeyVals(int key, int start, int end, int increment) - { - for (int i = start; i <= end; i += increment) { - output.emit(new KeyValPair<Integer, Object>(key, i)); - } - } - - @Override - public void endWindow() - { - } - - @Override - public void setup(OperatorContext context) - { - - } - - @Override - public void teardown() - { - - } - - @Override - public void emitTuples() - { - emitKeyVals(1, 1, 10, 1); - emitKeyVals(2, 3, 15, 3); - emitKeyVals(3, 2, 20, 2); - emitKeyVals(1, 5, 15, 1); - emitKeyVals(2, 11, 20, 1); - emitKeyVals(3, 11, 20, 1); - } - } - - static class VerifyTable extends BaseOperator - { - - private static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; - private static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver"; - protected static final String TABLE_NAME = "Test_Lookup_Cache"; - - public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() - { - @Override - public void process(Object tuple) - { - } - }; - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - try { - Class.forName(INMEM_DB_DRIVER).newInstance(); - Connection con = DriverManager.getConnection(INMEM_DB_URL, new Properties()); - Statement stmt = con.createStatement(); - ResultSet resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 1"); - ArrayList<Integer> answersOne = new ArrayList<Integer>(); - for (int i = 1; i < 16; i++) { - answersOne.add(i); - } - Assert.assertEquals(answersOne, processResult(resultSet)); - - resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 2"); - ArrayList<Integer> answersTwo = new ArrayList<Integer>(); - answersTwo.add(3); - answersTwo.add(6); - answersTwo.add(9); - for (int i = 11; i < 21; i++) { - answersTwo.add(i); - } - Assert.assertEquals(answersTwo, processResult(resultSet)); - - resultSet = stmt.executeQuery("SELECT col2 FROM " + TABLE_NAME + " WHERE col1 = 3"); - ArrayList<Integer> answersThree = new ArrayList<Integer>(); - answersThree.add(2); - answersThree.add(4); - answersThree.add(6); - answersThree.add(8); - answersThree.add(10); - for (int i = 11; i < 21; i++) { - answersThree.add(i); - } - Assert.assertEquals(answersThree, processResult(resultSet)); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - @Override - public void setup(OperatorContext context) - { - } - - @Override - public void teardown() - { - - } - - public static ArrayList<Integer> processResult(ResultSet resultSet) - { - ArrayList<Integer> tempList = new ArrayList<Integer>(); - try { - while (resultSet.next()) { - tempList.add(resultSet.getInt(1)); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - Collections.sort(tempList); - return tempList; - } - } - - public class Application implements StreamingApplication - { - @SuppressWarnings("unchecked") - @Override - public void populateDAG(DAG dag, Configuration conf) - { - KeyGen keyGen = dag.addOperator("KeyGenerator", new KeyGen()); - UniqueValueCount<Integer> valCount = dag.addOperator("ValueCounter", new UniqueValueCount<Integer>()); - IntegerUniqueValueCountAppender uniqueUnifier = dag.addOperator("Unique", new IntegerUniqueValueCountAppender()); - VerifyTable verifyTable = dag.addOperator("VerifyTable", new VerifyTable()); - - @SuppressWarnings("rawtypes") - DefaultOutputPort valOut = valCount.output; - @SuppressWarnings("rawtypes") - DefaultOutputPort uniqueOut = uniqueUnifier.output; - dag.addStream("DataIn", keyGen.output, valCount.input); - dag.addStream("UnifyWindows", valOut, uniqueUnifier.input); - dag.addStream("ResultsOut", uniqueOut, verifyTable.input); - } - } - - @BeforeClass - public static void setup() - { - try { - Class.forName(INMEM_DB_DRIVER).newInstance(); - Connection con = DriverManager.getConnection(INMEM_DB_URL, new Properties()); - Statement stmt = con.createStatement(); - stmt.execute("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (col1 INTEGER, col2 INTEGER, col3 BIGINT)"); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - @Test - public void testApplication() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.operator.Unique.prop.tableName", "Test_Lookup_Cache"); - conf.set("dt.operator.Unique.prop.store.dbUrl", "jdbc:hsqldb:mem:test;sql.syntax_mys=true"); - conf.set("dt.operator.Unique.prop.store.dbDriver", "org.hsqldb.jdbcDriver"); - - lma.prepareDAG(new Application(), conf); - lma.cloneDAG(); - LocalMode.Controller lc = lma.getController(); - lc.setHeartbeatMonitoringEnabled(false); - lc.runAsync(); - - long now = System.currentTimeMillis(); - while (System.currentTimeMillis() - now < 15000) { - Thread.sleep(1000); - } - lc.shutdown(); - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/distributedistinct/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/resources/log4j.properties b/demos/distributedistinct/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/distributedistinct/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/pom.xml ---------------------------------------------------------------------- diff --git a/demos/echoserver/pom.xml b/demos/echoserver/pom.xml deleted file mode 100644 index 603c678..0000000 --- a/demos/echoserver/pom.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>malhar-demos</artifactId> - <groupId>org.apache.apex</groupId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <groupId>org.apache.apex</groupId> - <artifactId>echoserver</artifactId> - <packaging>jar</packaging> - - <!-- change these to the appropriate values --> - <name>Apache Apex Malhar EchoServer Demo</name> - <description>A demo server that echos data sent by a network client back to it</description> -</project> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/assemble/appPackage.xml b/demos/echoserver/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/echoserver/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java deleted file mode 100644 index 90a3fd2..0000000 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Application.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.echoserver; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -/** - * @since 2.1.0 - */ -@ApplicationAnnotation(name = "EchoServer") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - MessageReceiver receiver = dag.addOperator("Message Receiver", MessageReceiver.class); - MessageResponder responder = dag.addOperator("Message Responder", MessageResponder.class); - // Locality has to be container so that the operators use the same socket - dag.addStream("messages", receiver.messageOutput, responder.messageInput).setLocality(DAG.Locality.CONTAINER_LOCAL); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Message.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Message.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Message.java deleted file mode 100644 index 94ce1b7..0000000 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/Message.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.echoserver; - -import java.net.SocketAddress; - -/** - * @since 2.1.0 - */ -public class Message -{ - public String message; - public SocketAddress socketAddress; -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageReceiver.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageReceiver.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageReceiver.java deleted file mode 100644 index 6dce73f..0000000 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageReceiver.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.echoserver; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; - -/** - * @since 2.1.0 - */ -public class MessageReceiver implements InputOperator, NetworkManager.ChannelListener<DatagramChannel> -{ - private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class); - - private transient NetworkManager.ChannelAction<DatagramChannel> action; - - //Need the sender info, using a packet for now instead of the buffer - private transient ByteBuffer buffer; - //private transient DatagramPacket packet; - - private int port = 9000; - private int maxMesgSize = 512; - private int inactiveWait = 10; - private boolean readReady = false; - - @Override - public void emitTuples() - { - boolean emitData = false; - if (readReady) { - //DatagramSocket socket = action.channelConfiguration.socket; - try { - //socket.receive(packet); - DatagramChannel channel = action.channelConfiguration.channel; - SocketAddress address = channel.receive(buffer); - if (address != null) { - /* - StringBuilder sb = new StringBuilder(); - buffer.rewind(); - while (buffer.hasRemaining()) { - sb.append(buffer.getChar()); - } - String mesg = sb.toString(); - */ - buffer.flip(); - String mesg = new String(buffer.array(), 0, buffer.limit()); - logger.info("Message {}", mesg); - Message message = new Message(); - message.message = mesg; - message.socketAddress = address; - messageOutput.emit(message); - emitData = true; - buffer.clear(); - } - //String mesg = new String(packet.getData(), packet.getOffset(), packet.getLength()); - } catch (IOException e) { - throw new RuntimeException("Error reading from channel", e); - } - // Even if we miss a readReady because of not locking we will get it again immediately - readReady = false; - } - if (!emitData) { - synchronized (buffer) { - try { - if (!readReady) { - buffer.wait(inactiveWait); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - } - - @Override - public void beginWindow(long l) - { - - } - - @Override - public void endWindow() - { - - } - - public final transient DefaultOutputPort<Message> messageOutput = new DefaultOutputPort<Message>(); - - @Override - public void setup(Context.OperatorContext context) - { - try { - //byte[] mesgData = new byte[maxMesgSize]; - //packet = new DatagramPacket(mesgData, maxMesgSize); - buffer = ByteBuffer.allocate(maxMesgSize); - action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, this, SelectionKey.OP_READ); - } catch (IOException e) { - throw new RuntimeException("Error initializing receiver", e); - } - } - - @Override - public void teardown() - { - try { - NetworkManager.getInstance().unregisterAction(action); - } catch (Exception e) { - throw new RuntimeException("Error shutting down receiver", e); - } - } - - @Override - public void ready(NetworkManager.ChannelAction<DatagramChannel> action, int readyOps) - { - synchronized (buffer) { - readReady = true; - buffer.notify(); - } - } - - public int getPort() - { - return port; - } - - public void setPort(int port) - { - this.port = port; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java deleted file mode 100644 index ce7a1bc..0000000 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/MessageResponder.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.echoserver; - -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * @since 2.1.0 - */ -public class MessageResponder extends BaseOperator -{ - - private String responseHeader = "Response: "; - - private int port = 9000; - private int maxMesgSize = 512; - private transient NetworkManager.ChannelAction<DatagramChannel> action; - private transient ByteBuffer buffer; - - public final transient DefaultInputPort<Message> messageInput = new DefaultInputPort<Message>() - { - @Override - public void process(Message message) - { - String sendMesg = responseHeader + message.message; - SocketAddress address = message.socketAddress; - buffer.put(sendMesg.getBytes()); - buffer.flip(); - try { - action.channelConfiguration.channel.send(buffer, address); - } catch (IOException e) { - throw new RuntimeException(e); - } - buffer.clear(); - } - }; - - @Override - public void setup(Context.OperatorContext context) - { - try { - buffer = ByteBuffer.allocate(maxMesgSize); - action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, null, 0); - } catch (IOException e) { - throw new RuntimeException("Error initializer responder", e); - } - } - - @Override - public void teardown() - { - try { - NetworkManager.getInstance().unregisterAction(action); - } catch (Exception e) { - throw new RuntimeException("Error shutting down responder", e); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java b/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java deleted file mode 100644 index 056068f..0000000 --- a/demos/echoserver/src/main/java/com/datatorrent/demos/echoserver/NetworkManager.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.echoserver; - -import java.io.IOException; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @since 2.1.0 - */ -public class NetworkManager implements Runnable -{ - private static final Logger logger = LoggerFactory.getLogger(NetworkManager.class); - - public static enum ConnectionType - { - TCP, - UDP - } - - private static NetworkManager _instance; - private Selector selector; - - private volatile boolean doRun = false; - private Thread selThread; - private long selTimeout = 1000; - private volatile Exception selEx; - - private Map<ConnectionInfo, ChannelConfiguration> channels; - private Map<SelectableChannel, ChannelConfiguration> channelConfigurations; - - public static NetworkManager getInstance() throws IOException - { - if (_instance == null) { - synchronized (NetworkManager.class) { - if (_instance == null) { - _instance = new NetworkManager(); - } - } - } - return _instance; - } - - private NetworkManager() throws IOException - { - channels = new HashMap<ConnectionInfo, ChannelConfiguration>(); - channelConfigurations = new HashMap<SelectableChannel, ChannelConfiguration>(); - } - - public synchronized <T extends SelectableChannel> ChannelAction<T> registerAction(int port, ConnectionType type, ChannelListener<T> listener, int ops) throws IOException - { - boolean startProc = (channels.size() == 0); - SelectableChannel channel = null; - SocketAddress address = new InetSocketAddress(port); - ConnectionInfo connectionInfo = new ConnectionInfo(); - connectionInfo.address = address; - connectionInfo.connectionType = type; - ChannelConfiguration channelConfiguration = channels.get(connectionInfo); - if (channelConfiguration == null) { - Object socket = null; - if (type == ConnectionType.TCP) { - SocketChannel schannel = SocketChannel.open(); - schannel.configureBlocking(false); - Socket ssocket = schannel.socket(); - ssocket.bind(address); - socket = ssocket; - channel = schannel; - } else if (type == ConnectionType.UDP) { - DatagramChannel dchannel = DatagramChannel.open(); - dchannel.configureBlocking(false); - DatagramSocket dsocket = dchannel.socket(); - dsocket.bind(address); - socket = dsocket; - channel = dchannel; - } - if (channel == null) { - throw new IOException("Unsupported connection type"); - } - channelConfiguration = new ChannelConfiguration(); - channelConfiguration.actions = new ConcurrentLinkedQueue<ChannelAction>(); - channelConfiguration.channel = channel; - channelConfiguration.connectionInfo = connectionInfo; - channels.put(connectionInfo, channelConfiguration); - channelConfigurations.put(channel, channelConfiguration); - } else { - channel = channelConfiguration.channel; - } - ChannelAction channelAction = new ChannelAction(); - channelAction.channelConfiguration = channelConfiguration; - channelAction.listener = listener; - channelAction.ops = ops; - channelConfiguration.actions.add(channelAction); - if (startProc) { - startProcess(); - } - if (listener != null) { - channel.register(selector, ops); - } - return channelAction; - } - - public synchronized void unregisterAction(ChannelAction action) throws IOException, InterruptedException - { - ChannelConfiguration channelConfiguration = action.channelConfiguration; - SelectableChannel channel = channelConfiguration.channel; - if (channelConfiguration != null) { - channelConfiguration.actions.remove(action); - if (channelConfiguration.actions.size() == 0) { - ConnectionInfo connectionInfo = channelConfiguration.connectionInfo; - channelConfigurations.remove(channel); - channels.remove(connectionInfo); - channel.close(); - } - } - if (channels.size() == 0) { - stopProcess(); - } - } - - private void startProcess() throws IOException - { - selector = Selector.open(); - doRun = true; - selThread = new Thread(this); - selThread.start(); - } - - private void stopProcess() throws InterruptedException, IOException - { - doRun = false; - selThread.join(); - selector.close(); - } - - @Override - public void run() - { - try { - while (doRun) { - int keys = selector.select(selTimeout); - if (keys > 0) { - Set<SelectionKey> selectionKeys = selector.selectedKeys(); - for (SelectionKey selectionKey : selectionKeys) { - int readyOps = selectionKey.readyOps(); - ChannelConfiguration channelConfiguration = channelConfigurations.get(selectionKey.channel()); - Collection<ChannelAction> actions = channelConfiguration.actions; - for (ChannelAction action : actions) { - if (((readyOps & action.ops) != 0) && (action.listener != null)) { - action.listener.ready(action, readyOps); - } - } - } - selectionKeys.clear(); - } - } - } catch (IOException e) { - logger.error("Error in select", e); - selEx = e; - } - } - - public static interface ChannelListener<T extends SelectableChannel> - { - public void ready(ChannelAction<T> action, int readyOps); - } - - public static class ChannelConfiguration<T extends SelectableChannel> - { - public T channel; - public ConnectionInfo connectionInfo; - public Collection<ChannelAction> actions; - } - - public static class ChannelAction<T extends SelectableChannel> - { - public ChannelConfiguration<T> channelConfiguration; - public ChannelListener<T> listener; - public int ops; - } - - private static class ConnectionInfo - { - public SocketAddress address; - public ConnectionType connectionType; - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ConnectionInfo that = (ConnectionInfo)o; - - if (connectionType != that.connectionType) { - return false; - } - if (!address.equals(that.address)) { - return false; - } - - return true; - } - - @Override - public int hashCode() - { - int result = address.hashCode(); - result = 31 * result + connectionType.hashCode(); - return result; - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/main/resources/META-INF/properties.xml b/demos/echoserver/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 0822f4f..0000000 --- a/demos/echoserver/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- - <property> - <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> - <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> - </property> - --> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> - <property> - <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name> - <value>hello world: %s</value> - </property> -</configuration> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/site/conf/my-app-conf1.xml b/demos/echoserver/src/site/conf/my-app-conf1.xml deleted file mode 100644 index f35873b..0000000 --- a/demos/echoserver/src/site/conf/my-app-conf1.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java b/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java deleted file mode 100644 index 8c52a9d..0000000 --- a/demos/echoserver/src/test/java/com/datatorrent/demos/echoserver/ApplicationTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.echoserver; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest -{ - - @Test - public void testApplication() throws IOException, Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); // runs for 10 seconds and quits - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/echoserver/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/echoserver/src/test/resources/log4j.properties b/demos/echoserver/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/echoserver/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/pom.xml ---------------------------------------------------------------------- diff --git a/demos/frauddetect/pom.xml b/demos/frauddetect/pom.xml deleted file mode 100644 index 5900354..0000000 --- a/demos/frauddetect/pom.xml +++ /dev/null @@ -1,49 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>frauddetect-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar Fraud Detect Demo</name> - <description>Apex demo application that demonstrates real-time pattern detection in the incoming data and alerting. The demo processes streaming credit card transactions and looks for fraudulent transactions.</description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <dependency> - <groupId>org.mongodb</groupId> - <artifactId>mongo-java-driver</artifactId> - <version>2.10.1</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/assemble/appPackage.xml b/demos/frauddetect/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/frauddetect/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java deleted file mode 100644 index 8d7c325..0000000 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/Application.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.frauddetect; - -import java.io.Serializable; -import java.net.URI; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.DAGContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.demos.frauddetect.operator.HdfsStringOutputOperator; -import com.datatorrent.demos.frauddetect.operator.MongoDBOutputOperator; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.io.PubSubWebSocketInputOperator; -import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; -import com.datatorrent.lib.math.RangeKeyVal; -import com.datatorrent.lib.multiwindow.SimpleMovingAverage; -import com.datatorrent.lib.util.BaseKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.netlet.util.DTThrowable; - - -/** - * Fraud detection application - * - * @since 0.9.0 - */ -@ApplicationAnnotation(name = "FraudDetectDemo") -public class Application implements StreamingApplication -{ - - public PubSubWebSocketInputOperator getPubSubWebSocketInputOperator(String name, DAG dag, URI duri, String topic) throws Exception - { - PubSubWebSocketInputOperator reqin = dag.addOperator(name, new PubSubWebSocketInputOperator()); - reqin.setUri(duri); - reqin.setTopic(topic); - return reqin; - } - - public PubSubWebSocketOutputOperator getPubSubWebSocketOutputOperator(String name, DAG dag, URI duri, String topic) throws Exception - { - PubSubWebSocketOutputOperator out = dag.addOperator(name, new PubSubWebSocketOutputOperator()); - out.setUri(duri); - return out; - } - - public HdfsStringOutputOperator getHdfsOutputOperator(String name, DAG dag, String folderName) - { - HdfsStringOutputOperator oper = dag.addOperator("hdfs", HdfsStringOutputOperator.class); - oper.setFilePath(folderName); - oper.setMaxLength(1024 * 1024 * 1024); - return oper; - } - - public ConsoleOutputOperator getConsoleOperator(String name, DAG dag, String prefix, String format) - { - ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class); - // oper.setStringFormat(prefix + ": " + format); - return oper; - } - - public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable - { - private static final long serialVersionUID = 201410031623L; - } - - @SuppressWarnings("unchecked") - @Override - public void populateDAG(DAG dag, Configuration conf) - { - - try { - String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS); - if (gatewayAddress == null) { - gatewayAddress = "localhost:9090"; - } - URI duri = URI.create("ws://" + gatewayAddress + "/pubsub"); - - PubSubWebSocketInputOperator userTxWsInput = getPubSubWebSocketInputOperator("userTxInput", dag, duri, "demos.app.frauddetect.submitTransaction"); - PubSubWebSocketOutputOperator ccUserAlertWsOutput = getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, "demos.app.frauddetect.fraudAlert"); - PubSubWebSocketOutputOperator avgUserAlertwsOutput = getPubSubWebSocketOutputOperator("avgUserAlertQueryOutput", dag, duri, "demos.app.frauddetect.fraudAlert"); - PubSubWebSocketOutputOperator binUserAlertwsOutput = getPubSubWebSocketOutputOperator("binUserAlertOutput", dag, duri, "demos.app.frauddetect.fraudAlert"); - PubSubWebSocketOutputOperator txSummaryWsOutput = getPubSubWebSocketOutputOperator("txSummaryWsOutput", dag, duri, "demos.app.frauddetect.txSummary"); - SlidingWindowSumKeyVal<KeyValPair<MerchantKey, String>, Integer> smsOperator = dag.addOperator("movingSum", SlidingWindowSumKeyVal.class); - - MerchantTransactionGenerator txReceiver = dag.addOperator("txReceiver", MerchantTransactionGenerator.class); - MerchantTransactionInputHandler txInputHandler = dag.addOperator("txInputHandler", new MerchantTransactionInputHandler()); - BankIdNumberSamplerOperator binSampler = dag.addOperator("bankInfoFraudDetector", BankIdNumberSamplerOperator.class); - - MerchantTransactionBucketOperator txBucketOperator = dag.addOperator("txFilter", MerchantTransactionBucketOperator.class); - RangeKeyVal rangeOperator = dag.addOperator("rangePerMerchant", new RangeKeyVal<MerchantKey, Long>()); - SimpleMovingAverage<MerchantKey, Long> smaOperator = dag.addOperator("smaPerMerchant", SimpleMovingAverage.class); - TransactionStatsAggregator txStatsAggregator = dag.addOperator("txStatsAggregator", TransactionStatsAggregator.class); - AverageAlertingOperator avgAlertingOperator = dag.addOperator("avgAlerter", AverageAlertingOperator.class); - CreditCardAmountSamplerOperator ccSamplerOperator = dag.addOperator("amountFraudDetector", CreditCardAmountSamplerOperator.class); - HdfsStringOutputOperator hdfsOutputOperator = getHdfsOutputOperator("hdfsOutput", dag, "fraud"); - - MongoDBOutputOperator mongoTxStatsOperator = dag.addOperator("mongoTxStatsOutput", MongoDBOutputOperator.class); - MongoDBOutputOperator mongoBinAlertsOperator = dag.addOperator("mongoBinAlertsOutput", MongoDBOutputOperator.class); - MongoDBOutputOperator mongoCcAlertsOperator = dag.addOperator("mongoCcAlertsOutput", MongoDBOutputOperator.class); - MongoDBOutputOperator mongoAvgAlertsOperator = dag.addOperator("mongoAvgAlertsOutput", MongoDBOutputOperator.class); - - dag.addStream("userTxStream", userTxWsInput.outputPort, txInputHandler.userTxInputPort); - dag.addStream("transactions", txReceiver.txOutputPort, txBucketOperator.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL); - dag.addStream("txData", txReceiver.txDataOutputPort, hdfsOutputOperator.input); // dump all tx into Hdfs - dag.addStream("userTransactions", txInputHandler.txOutputPort, txBucketOperator.txUserInputPort); - dag.addStream("bankInfoData", txBucketOperator.binCountOutputPort, smsOperator.data); - dag.addStream("bankInfoCount", smsOperator.integerSum, binSampler.txCountInputPort); - dag.addStream("filteredTransactions", txBucketOperator.txOutputPort, rangeOperator.data, smaOperator.data, avgAlertingOperator.txInputPort); - - KeyPartitionCodec<MerchantKey, Long> txCodec = new KeyPartitionCodec<MerchantKey, Long>(); - dag.setInputPortAttribute(rangeOperator.data, Context.PortContext.STREAM_CODEC, txCodec); - dag.setInputPortAttribute(smaOperator.data, Context.PortContext.STREAM_CODEC, txCodec); - dag.setInputPortAttribute(avgAlertingOperator.txInputPort, Context.PortContext.STREAM_CODEC, txCodec); - - dag.addStream("creditCardData", txBucketOperator.ccAlertOutputPort, ccSamplerOperator.inputPort); - dag.addStream("txnSummaryData", txBucketOperator.summaryTxnOutputPort, txSummaryWsOutput.input); - dag.addStream("smaAlerts", smaOperator.doubleSMA, avgAlertingOperator.smaInputPort); - dag.addStream("binAlerts", binSampler.countAlertOutputPort, mongoBinAlertsOperator.inputPort); - dag.addStream("binAlertsNotification", binSampler.countAlertNotificationPort, binUserAlertwsOutput.input); - dag.addStream("rangeData", rangeOperator.range, txStatsAggregator.rangeInputPort); - dag.addStream("smaData", smaOperator.longSMA, txStatsAggregator.smaInputPort); - dag.addStream("txStatsOutput", txStatsAggregator.txDataOutputPort, mongoTxStatsOperator.inputPort); - dag.addStream("avgAlerts", avgAlertingOperator.avgAlertOutputPort, mongoAvgAlertsOperator.inputPort); - dag.addStream("avgAlertsNotification", avgAlertingOperator.avgAlertNotificationPort, avgUserAlertwsOutput.input); - dag.addStream("ccAlerts", ccSamplerOperator.ccAlertOutputPort, mongoCcAlertsOperator.inputPort); - dag.addStream("ccAlertsNotification", ccSamplerOperator.ccAlertNotificationPort, ccUserAlertWsOutput.input); - - } catch (Exception exc) { - DTThrowable.rethrow(exc); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertData.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertData.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertData.java deleted file mode 100644 index 961be96..0000000 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertData.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.frauddetect; - -/** - * POJO to capture average alert data. - * - * @since 0.9.0 - */ -public class AverageAlertData -{ - public String merchantId; - public int terminalId; - public int zipCode; - public MerchantTransaction.MerchantType merchantType; - public long amount; - public double lastSmaValue; - public double change; - public boolean userGenerated; - public long time; -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java deleted file mode 100644 index b813a40..0000000 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/AverageAlertingOperator.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.frauddetect; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.validation.constraints.NotNull; - -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.map.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang.mutable.MutableDouble; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.demos.frauddetect.util.JsonUtils; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Generate an alert if the current transaction amount received on tx input port for the given key is greater by n % - * than the SMA of the last application window as received on the SMA input port. - * - * @since 0.9.0 - */ -public class AverageAlertingOperator extends BaseOperator -{ - private static final Logger Log = LoggerFactory.getLogger(AverageAlertingOperator.class); - private final transient JsonFactory jsonFactory = new JsonFactory(); - private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); - private Map<MerchantKey, MutableDouble> lastSMAMap = new HashMap<MerchantKey, MutableDouble>(); - private Map<MerchantKey, MutableDouble> currentSMAMap = new HashMap<MerchantKey, MutableDouble>(); - private List<AverageAlertData> alerts = new ArrayList<AverageAlertData>(); - @NotNull - private int threshold; - private static final String brickMortarAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s at Terminal %d!"; - private static final String internetAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s!"; - public final transient DefaultOutputPort<String> avgAlertOutputPort = new DefaultOutputPort<String>(); - public final transient DefaultOutputPort<Map<String, Object>> avgAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>(); - public final transient DefaultInputPort<KeyValPair<MerchantKey, Double>> smaInputPort = - new DefaultInputPort<KeyValPair<MerchantKey, Double>>() - { - @Override - public void process(KeyValPair<MerchantKey, Double> tuple) - { - MutableDouble currentSma = currentSMAMap.get(tuple.getKey()); - if (currentSma == null) { // first sma for the given key - double sma = tuple.getValue(); - currentSMAMap.put(tuple.getKey(), new MutableDouble(sma)); - //lastSMAMap.put(tuple.getKey(), new MutableDouble(sma)); - } else { // move the current SMA value to the last SMA Map - //lastSMAMap.get(tuple.getKey()).setValue(currentSma.getValue()); - currentSma.setValue(tuple.getValue()); // update the current SMA value - } - } - - }; - public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> txInputPort = - new DefaultInputPort<KeyValPair<MerchantKey, Long>>() - { - @Override - public void process(KeyValPair<MerchantKey, Long> tuple) - { - processTuple(tuple); - } - - }; - - private void processTuple(KeyValPair<MerchantKey, Long> tuple) - { - MerchantKey merchantKey = tuple.getKey(); - MutableDouble lastSma = lastSMAMap.get(tuple.getKey()); - long txValue = tuple.getValue(); - if (lastSma != null && txValue > lastSma.doubleValue()) { - double lastSmaValue = lastSma.doubleValue(); - double change = txValue - lastSmaValue; - if (change > threshold) { // generate an alert - AverageAlertData data = getOutputData(merchantKey, txValue, change, lastSmaValue); - alerts.add(data); - //if (userGenerated) { // if its user generated only the pass it to WebSocket - if (merchantKey.merchantType == MerchantTransaction.MerchantType.BRICK_AND_MORTAR) { - avgAlertNotificationPort.emit(getOutputData(data, String.format(brickMortarAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId, merchantKey.terminalId))); - } else { // its internet based - avgAlertNotificationPort.emit(getOutputData(data, String.format(internetAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId))); - - } - //} - } - } - } - - @Override - public void endWindow() - { - for (AverageAlertData data : alerts) { - try { - avgAlertOutputPort.emit(JsonUtils.toJson(data)); - } catch (IOException e) { - logger.warn("Exception while converting object to JSON", e); - } - } - - alerts.clear(); - - for (Map.Entry<MerchantKey, MutableDouble> entry : currentSMAMap.entrySet()) { - MerchantKey key = entry.getKey(); - MutableDouble currentSma = entry.getValue(); - MutableDouble lastSma = lastSMAMap.get(key); - if (lastSma == null) { - lastSma = new MutableDouble(currentSma.doubleValue()); - lastSMAMap.put(key, lastSma); - } else { - lastSma.setValue(currentSma.getValue()); - } - } - } - - private AverageAlertData getOutputData(MerchantKey key, long amount, double change, double lastSmaValue) - { - AverageAlertData data = new AverageAlertData(); - - data.merchantId = key.merchantId; - data.terminalId = key.terminalId == null ? 0 : key.terminalId; - data.zipCode = key.zipCode; - data.merchantType = key.merchantType; - data.amount = amount; - data.lastSmaValue = lastSmaValue; - data.change = change; - //data.userGenerated = userGenerated; - data.userGenerated = key.userGenerated; - data.time = System.currentTimeMillis(); - - return data; - } - - private Map<String, Object> getOutputData(AverageAlertData data, String msg) - { - Map<String, Object> output = new HashMap<String, Object>(); - output.put("message", msg); - output.put("alertType", "aboveAvg"); - output.put("userGenerated", "" + data.userGenerated); - output.put("alertData", data); - - try { - String str = mapper.writeValueAsString(output); - logger.debug("user generated tx alert: " + str); - } catch (Exception exc) { - //ignore - } - return output; - } - - public int getThreshold() - { - return threshold; - } - - public void setThreshold(int threshold) - { - this.threshold = threshold; - } - - private static final Logger logger = LoggerFactory.getLogger(AverageAlertingOperator.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberAlertData.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberAlertData.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberAlertData.java deleted file mode 100644 index 6f18199..0000000 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberAlertData.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.frauddetect; - -/** - * POJO to capture data related to alerts for repetitive bank id number data usage. - * - * @since 0.9.0 - */ -public class BankIdNumberAlertData -{ - public String merchantId; - public int terminalId; - public int zipCode; - public MerchantTransaction.MerchantType merchantType; - public String bankIdNum; - public int count; - public boolean userGenerated; - public long time; -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java deleted file mode 100644 index 87cf043..0000000 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberKey.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.frauddetect; - -import java.io.Serializable; - -import com.datatorrent.lib.util.TimeBucketKey; - -/** - * Bank Id Number Key - * - * @since 0.9.0 - */ -public class BankIdNumberKey extends TimeBucketKey implements Serializable -{ - public String bankIdNum; - - public BankIdNumberKey() - { - } - - @Override - public int hashCode() - { - int key = 0; - key |= (1 << 1); - key |= (bankIdNum.hashCode()); - return super.hashCode() ^ key; - } - - @Override - public boolean equals(Object obj) - { - if (!(obj instanceof BankIdNumberKey)) { - return false; - } - return super.equals(obj) - && bankIdNum.equals(((BankIdNumberKey)obj).bankIdNum); - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder(super.toString()); - sb.append("|1:").append(bankIdNum); - return sb.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java deleted file mode 100644 index abfa202..0000000 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/BankIdNumberSamplerOperator.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.frauddetect; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.map.ObjectMapper; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang.mutable.MutableLong; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.demos.frauddetect.util.JsonUtils; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Count the transactions for the underlying aggregation window if the same BIN is - * being used for more than defined number of transactions. Output the data as needed - * by Mongo output operator - * - * @since 0.9.0 - */ -public class BankIdNumberSamplerOperator extends BaseOperator -{ - private final transient JsonFactory jsonFactory = new JsonFactory(); - private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); - private int threshold; - private Map<MerchantKey, Map<String, BankIdNumData>> bankIdNumCountMap = new HashMap<MerchantKey, Map<String, BankIdNumData>>(); - private static final String ALERT_MSG = - "Potential fraudulent CC transactions (same bank id %s and merchant %s) total transactions: %d"; - /** - * Output the key-value pair for the BIN as key with the count as value. - */ - public final transient DefaultOutputPort<String> countAlertOutputPort = - new DefaultOutputPort<String>(); - public final transient DefaultOutputPort<Map<String, Object>> countAlertNotificationPort = - new DefaultOutputPort<Map<String, Object>>(); - - public int getThreshold() - { - return threshold; - } - - public void setThreshold(int threshold) - { - this.threshold = threshold; - } - - /* - public final transient DefaultInputPort<KeyValPair<MerchantKey, String>> txInputPort = - new DefaultInputPort<KeyValPair<MerchantKey, String>>() - { - @Override - public void process(KeyValPair<MerchantKey, String> tuple) - { - processTuple(tuple); - } - - }; - - private void processTuple(KeyValPair<MerchantKey, String> tuple) - { - MerchantKey key = tuple.getKey(); - Map<String, BankIdNumData> map = bankIdNumCountMap.get(key); - if (map == null) { - map = new HashMap<String, BankIdNumData>(); - bankIdNumCountMap.put(key, map); - } - String bankIdNum = tuple.getValue(); - BankIdNumData bankIdNumData = map.get(bankIdNum); - if (bankIdNumData == null) { - bankIdNumData = new BankIdNumData(); - bankIdNumData.bankIdNum = bankIdNum; - map.put(bankIdNum, bankIdNumData); - } - bankIdNumData.count.increment(); - if (key.userGenerated) { - bankIdNumData.userGenerated = true; - } - } - */ - - public final transient DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> txCountInputPort = - new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>() - { - @Override - public void process(KeyValPair<KeyValPair<MerchantKey, String>, Integer> tuple) - { - processTuple(tuple.getKey(), tuple.getValue()); - } - - }; - - private void processTuple(KeyValPair<MerchantKey, String> tuple, Integer count) - { - MerchantKey key = tuple.getKey(); - Map<String, BankIdNumData> map = bankIdNumCountMap.get(key); - if (map == null) { - map = new HashMap<String, BankIdNumData>(); - bankIdNumCountMap.put(key, map); - } - String bankIdNum = tuple.getValue(); - BankIdNumData bankIdNumData = map.get(bankIdNum); - if (bankIdNumData == null) { - bankIdNumData = new BankIdNumData(); - bankIdNumData.bankIdNum = bankIdNum; - map.put(bankIdNum, bankIdNumData); - } - bankIdNumData.count.setValue(count); - if (key.userGenerated) { - bankIdNumData.userGenerated = true; - } - } - - /** - * Go through the BIN Counter map and check if any of the values for the BIN exceed the threshold. - * If yes, generate the alert on the output port. - */ - @Override - public void endWindow() - { - for (Map.Entry<MerchantKey, Map<String, BankIdNumData>> entry : bankIdNumCountMap.entrySet()) { - List<BankIdNumData> list = null; - MerchantKey key = entry.getKey(); - if (key.merchantType == MerchantTransaction.MerchantType.INTERNET) { - continue; - } - list = dataOutput(entry.getValue()); - if (list.size() > 0) { - for (BankIdNumData binData : list) { - BankIdNumberAlertData data = new BankIdNumberAlertData(); - data.merchantId = key.merchantId; - data.terminalId = key.terminalId == null ? 0 : key.terminalId; - data.zipCode = key.zipCode; - data.merchantType = key.merchantType; - data.bankIdNum = binData.bankIdNum; - data.count = binData.count.intValue(); - data.userGenerated = binData.userGenerated; - data.time = System.currentTimeMillis(); - try { - countAlertOutputPort.emit(JsonUtils.toJson(data)); - countAlertNotificationPort.emit(getOutputData(data)); - } catch (IOException e) { - logger.warn("Exception while converting object to JSON: ", e); - } - } - } - } - bankIdNumCountMap.clear(); - } - - private List<BankIdNumData> dataOutput(Map<String, BankIdNumData> map) - { - List<BankIdNumData> list = new ArrayList<BankIdNumData>(); - int count = 0; - for (Map.Entry<String, BankIdNumData> bankIdEntry : map.entrySet()) { - BankIdNumData data = bankIdEntry.getValue(); - if (data.count.intValue() > threshold) { - list.add(data); - } - } - return list; - } - - private Map<String, Object> getOutputData(BankIdNumberAlertData data) - { - Map<String, Object> output = new HashMap<String, Object>(); - output.put("message", String.format(ALERT_MSG, data.bankIdNum, data.merchantId, data.count)); - output.put("alertType", "sameBankId"); - output.put("userGenerated", "" + data.userGenerated); - output.put("alertData", data); - - try { - String str = mapper.writeValueAsString(output); - logger.debug("user generated tx alert: " + str); - } catch (Exception exc) { - //ignore - } - - return output; - } - - public static final class BankIdNumData - { - public String bankIdNum; - public MutableLong count = new MutableLong(); - public boolean userGenerated = false; - } - - private static final Logger logger = LoggerFactory.getLogger(BankIdNumberSamplerOperator.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAlertData.java ---------------------------------------------------------------------- diff --git a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAlertData.java b/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAlertData.java deleted file mode 100644 index 885bf94..0000000 --- a/demos/frauddetect/src/main/java/com/datatorrent/demos/frauddetect/CreditCardAlertData.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.frauddetect; - -/** - * POJO to capture data related to alerts for credit card number. - * - * @since 0.9.0 - */ -public class CreditCardAlertData -{ - public String merchantId; - public int terminalId; - public int zipCode; - public MerchantTransaction.MerchantType merchantType; - public String fullCcNum; - public long small; - public long large; - public double threshold; - public boolean userGenerated; - public long time; -}
