http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java ---------------------------------------------------------------------- diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java new file mode 100644 index 0000000..34a2c56 --- /dev/null +++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageReceiver.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.echoserver; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; + +/** + * @since 2.1.0 + */ +public class MessageReceiver implements InputOperator, NetworkManager.ChannelListener<DatagramChannel> +{ + private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class); + + private transient NetworkManager.ChannelAction<DatagramChannel> action; + + //Need the sender info, using a packet for now instead of the buffer + private transient ByteBuffer buffer; + //private transient DatagramPacket packet; + + private int port = 9000; + private int maxMesgSize = 512; + private int inactiveWait = 10; + private boolean readReady = false; + + @Override + public void emitTuples() + { + boolean emitData = false; + if (readReady) { + //DatagramSocket socket = action.channelConfiguration.socket; + try { + //socket.receive(packet); + DatagramChannel channel = action.channelConfiguration.channel; + SocketAddress address = channel.receive(buffer); + if (address != null) { + /* + StringBuilder sb = new StringBuilder(); + buffer.rewind(); + while (buffer.hasRemaining()) { + sb.append(buffer.getChar()); + } + String mesg = sb.toString(); + */ + buffer.flip(); + String mesg = new String(buffer.array(), 0, buffer.limit()); + logger.info("Message {}", mesg); + Message message = new Message(); + message.message = mesg; + message.socketAddress = address; + messageOutput.emit(message); + emitData = true; + buffer.clear(); + } + //String mesg = new String(packet.getData(), packet.getOffset(), packet.getLength()); + } catch (IOException e) { + throw new RuntimeException("Error reading from channel", e); + } + // Even if we miss a readReady because of not locking we will get it again immediately + readReady = false; + } + if (!emitData) { + synchronized (buffer) { + try { + if (!readReady) { + buffer.wait(inactiveWait); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + @Override + public void beginWindow(long l) + { + + } + + @Override + public void endWindow() + { + + } + + public final transient DefaultOutputPort<Message> messageOutput = new DefaultOutputPort<Message>(); + + @Override + public void setup(Context.OperatorContext context) + { + try { + //byte[] mesgData = new byte[maxMesgSize]; + //packet = new DatagramPacket(mesgData, maxMesgSize); + buffer = ByteBuffer.allocate(maxMesgSize); + action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, this, SelectionKey.OP_READ); + } catch (IOException e) { + throw new RuntimeException("Error initializing receiver", e); + } + } + + @Override + public void teardown() + { + try { + NetworkManager.getInstance().unregisterAction(action); + } catch (Exception e) { + throw new RuntimeException("Error shutting down receiver", e); + } + } + + @Override + public void ready(NetworkManager.ChannelAction<DatagramChannel> action, int readyOps) + { + synchronized (buffer) { + readReady = true; + buffer.notify(); + } + } + + public int getPort() + { + return port; + } + + public void setPort(int port) + { + this.port = port; + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java ---------------------------------------------------------------------- diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java new file mode 100644 index 0000000..d34bf3e --- /dev/null +++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/MessageResponder.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.echoserver; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * @since 2.1.0 + */ +public class MessageResponder extends BaseOperator +{ + + private String responseHeader = "Response: "; + + private int port = 9000; + private int maxMesgSize = 512; + private transient NetworkManager.ChannelAction<DatagramChannel> action; + private transient ByteBuffer buffer; + + public final transient DefaultInputPort<Message> messageInput = new DefaultInputPort<Message>() + { + @Override + public void process(Message message) + { + String sendMesg = responseHeader + message.message; + SocketAddress address = message.socketAddress; + buffer.put(sendMesg.getBytes()); + buffer.flip(); + try { + action.channelConfiguration.channel.send(buffer, address); + } catch (IOException e) { + throw new RuntimeException(e); + } + buffer.clear(); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + try { + buffer = ByteBuffer.allocate(maxMesgSize); + action = NetworkManager.getInstance().registerAction(port, NetworkManager.ConnectionType.UDP, null, 0); + } catch (IOException e) { + throw new RuntimeException("Error initializer responder", e); + } + } + + @Override + public void teardown() + { + try { + NetworkManager.getInstance().unregisterAction(action); + } catch (Exception e) { + throw new RuntimeException("Error shutting down responder", e); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java ---------------------------------------------------------------------- diff --git a/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java new file mode 100644 index 0000000..88cf621 --- /dev/null +++ b/examples/echoserver/src/main/java/org/apache/apex/examples/echoserver/NetworkManager.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.echoserver; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @since 2.1.0 + */ +public class NetworkManager implements Runnable +{ + private static final Logger logger = LoggerFactory.getLogger(NetworkManager.class); + + public static enum ConnectionType + { + TCP, + UDP + } + + private static NetworkManager _instance; + private Selector selector; + + private volatile boolean doRun = false; + private Thread selThread; + private long selTimeout = 1000; + private volatile Exception selEx; + + private Map<ConnectionInfo, ChannelConfiguration> channels; + private Map<SelectableChannel, ChannelConfiguration> channelConfigurations; + + public static NetworkManager getInstance() throws IOException + { + if (_instance == null) { + synchronized (NetworkManager.class) { + if (_instance == null) { + _instance = new NetworkManager(); + } + } + } + return _instance; + } + + private NetworkManager() throws IOException + { + channels = new HashMap<ConnectionInfo, ChannelConfiguration>(); + channelConfigurations = new HashMap<SelectableChannel, ChannelConfiguration>(); + } + + public synchronized <T extends SelectableChannel> ChannelAction<T> registerAction(int port, ConnectionType type, ChannelListener<T> listener, int ops) throws IOException + { + boolean startProc = (channels.size() == 0); + SelectableChannel channel = null; + SocketAddress address = new InetSocketAddress(port); + ConnectionInfo connectionInfo = new ConnectionInfo(); + connectionInfo.address = address; + connectionInfo.connectionType = type; + ChannelConfiguration channelConfiguration = channels.get(connectionInfo); + if (channelConfiguration == null) { + Object socket = null; + if (type == ConnectionType.TCP) { + SocketChannel schannel = SocketChannel.open(); + schannel.configureBlocking(false); + Socket ssocket = schannel.socket(); + ssocket.bind(address); + socket = ssocket; + channel = schannel; + } else if (type == ConnectionType.UDP) { + DatagramChannel dchannel = DatagramChannel.open(); + dchannel.configureBlocking(false); + DatagramSocket dsocket = dchannel.socket(); + dsocket.bind(address); + socket = dsocket; + channel = dchannel; + } + if (channel == null) { + throw new IOException("Unsupported connection type"); + } + channelConfiguration = new ChannelConfiguration(); + channelConfiguration.actions = new ConcurrentLinkedQueue<ChannelAction>(); + channelConfiguration.channel = channel; + channelConfiguration.connectionInfo = connectionInfo; + channels.put(connectionInfo, channelConfiguration); + channelConfigurations.put(channel, channelConfiguration); + } else { + channel = channelConfiguration.channel; + } + ChannelAction channelAction = new ChannelAction(); + channelAction.channelConfiguration = channelConfiguration; + channelAction.listener = listener; + channelAction.ops = ops; + channelConfiguration.actions.add(channelAction); + if (startProc) { + startProcess(); + } + if (listener != null) { + channel.register(selector, ops); + } + return channelAction; + } + + public synchronized void unregisterAction(ChannelAction action) throws IOException, InterruptedException + { + ChannelConfiguration channelConfiguration = action.channelConfiguration; + SelectableChannel channel = channelConfiguration.channel; + if (channelConfiguration != null) { + channelConfiguration.actions.remove(action); + if (channelConfiguration.actions.size() == 0) { + ConnectionInfo connectionInfo = channelConfiguration.connectionInfo; + channelConfigurations.remove(channel); + channels.remove(connectionInfo); + channel.close(); + } + } + if (channels.size() == 0) { + stopProcess(); + } + } + + private void startProcess() throws IOException + { + selector = Selector.open(); + doRun = true; + selThread = new Thread(this); + selThread.start(); + } + + private void stopProcess() throws InterruptedException, IOException + { + doRun = false; + selThread.join(); + selector.close(); + } + + @Override + public void run() + { + try { + while (doRun) { + int keys = selector.select(selTimeout); + if (keys > 0) { + Set<SelectionKey> selectionKeys = selector.selectedKeys(); + for (SelectionKey selectionKey : selectionKeys) { + int readyOps = selectionKey.readyOps(); + ChannelConfiguration channelConfiguration = channelConfigurations.get(selectionKey.channel()); + Collection<ChannelAction> actions = channelConfiguration.actions; + for (ChannelAction action : actions) { + if (((readyOps & action.ops) != 0) && (action.listener != null)) { + action.listener.ready(action, readyOps); + } + } + } + selectionKeys.clear(); + } + } + } catch (IOException e) { + logger.error("Error in select", e); + selEx = e; + } + } + + public static interface ChannelListener<T extends SelectableChannel> + { + public void ready(ChannelAction<T> action, int readyOps); + } + + public static class ChannelConfiguration<T extends SelectableChannel> + { + public T channel; + public ConnectionInfo connectionInfo; + public Collection<ChannelAction> actions; + } + + public static class ChannelAction<T extends SelectableChannel> + { + public ChannelConfiguration<T> channelConfiguration; + public ChannelListener<T> listener; + public int ops; + } + + private static class ConnectionInfo + { + public SocketAddress address; + public ConnectionType connectionType; + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ConnectionInfo that = (ConnectionInfo)o; + + if (connectionType != that.connectionType) { + return false; + } + if (!address.equals(that.address)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = address.hashCode(); + result = 31 * result + connectionType.hashCode(); + return result; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/echoserver/src/main/resources/META-INF/properties.xml b/examples/echoserver/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..0822f4f --- /dev/null +++ b/examples/echoserver/src/main/resources/META-INF/properties.xml @@ -0,0 +1,38 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- + <property> + <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> + </property> + --> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> + <property> + <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name> + <value>hello world: %s</value> + </property> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java b/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java new file mode 100644 index 0000000..c4a3ad4 --- /dev/null +++ b/examples/echoserver/src/test/java/org/apache/apex/examples/echoserver/ApplicationTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.echoserver; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/echoserver/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/echoserver/src/test/resources/log4j.properties b/examples/echoserver/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/echoserver/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/pom.xml ---------------------------------------------------------------------- diff --git a/examples/frauddetect/pom.xml b/examples/frauddetect/pom.xml new file mode 100644 index 0000000..36d4153 --- /dev/null +++ b/examples/frauddetect/pom.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-frauddetect</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar Fraud Detect Example</name> + <description>Apex example application that demonstrates real-time pattern detection in the incoming data and alerting. The example processes streaming credit card transactions and looks for fraudulent transactions.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <version>2.10.1</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/assemble/appPackage.xml b/examples/frauddetect/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/frauddetect/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java new file mode 100644 index 0000000..73c38ef --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/Application.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.Serializable; +import java.net.URI; +import org.apache.apex.examples.frauddetect.operator.HdfsStringOutputOperator; +import org.apache.apex.examples.frauddetect.operator.MongoDBOutputOperator; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketInputOperator; +import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; +import com.datatorrent.lib.math.RangeKeyVal; +import com.datatorrent.lib.multiwindow.SimpleMovingAverage; +import com.datatorrent.lib.util.BaseKeyValueOperator; +import com.datatorrent.lib.util.KeyValPair; +import com.datatorrent.netlet.util.DTThrowable; + + +/** + * Fraud detection application + * + * @since 0.9.0 + */ +@ApplicationAnnotation(name = "FraudDetectExample") +public class Application implements StreamingApplication +{ + + public PubSubWebSocketInputOperator getPubSubWebSocketInputOperator(String name, DAG dag, URI duri, String topic) throws Exception + { + PubSubWebSocketInputOperator reqin = dag.addOperator(name, new PubSubWebSocketInputOperator()); + reqin.setUri(duri); + reqin.setTopic(topic); + return reqin; + } + + public PubSubWebSocketOutputOperator getPubSubWebSocketOutputOperator(String name, DAG dag, URI duri, String topic) throws Exception + { + PubSubWebSocketOutputOperator out = dag.addOperator(name, new PubSubWebSocketOutputOperator()); + out.setUri(duri); + return out; + } + + public HdfsStringOutputOperator getHdfsOutputOperator(String name, DAG dag, String folderName) + { + HdfsStringOutputOperator oper = dag.addOperator("hdfs", HdfsStringOutputOperator.class); + oper.setFilePath(folderName); + oper.setMaxLength(1024 * 1024 * 1024); + return oper; + } + + public ConsoleOutputOperator getConsoleOperator(String name, DAG dag, String prefix, String format) + { + ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class); + // oper.setStringFormat(prefix + ": " + format); + return oper; + } + + public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable + { + private static final long serialVersionUID = 201410031623L; + } + + @SuppressWarnings("unchecked") + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + try { + String gatewayAddress = dag.getValue(DAGContext.GATEWAY_CONNECT_ADDRESS); + if (gatewayAddress == null) { + gatewayAddress = "localhost:9090"; + } + URI duri = URI.create("ws://" + gatewayAddress + "/pubsub"); + + PubSubWebSocketInputOperator userTxWsInput = getPubSubWebSocketInputOperator("userTxInput", dag, duri, "examples.app.frauddetect.submitTransaction"); + PubSubWebSocketOutputOperator ccUserAlertWsOutput = getPubSubWebSocketOutputOperator("ccUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert"); + PubSubWebSocketOutputOperator avgUserAlertwsOutput = getPubSubWebSocketOutputOperator("avgUserAlertQueryOutput", dag, duri, "examples.app.frauddetect.fraudAlert"); + PubSubWebSocketOutputOperator binUserAlertwsOutput = getPubSubWebSocketOutputOperator("binUserAlertOutput", dag, duri, "examples.app.frauddetect.fraudAlert"); + PubSubWebSocketOutputOperator txSummaryWsOutput = getPubSubWebSocketOutputOperator("txSummaryWsOutput", dag, duri, "examples.app.frauddetect.txSummary"); + SlidingWindowSumKeyVal<KeyValPair<MerchantKey, String>, Integer> smsOperator = dag.addOperator("movingSum", SlidingWindowSumKeyVal.class); + + MerchantTransactionGenerator txReceiver = dag.addOperator("txReceiver", MerchantTransactionGenerator.class); + MerchantTransactionInputHandler txInputHandler = dag.addOperator("txInputHandler", new MerchantTransactionInputHandler()); + BankIdNumberSamplerOperator binSampler = dag.addOperator("bankInfoFraudDetector", BankIdNumberSamplerOperator.class); + + MerchantTransactionBucketOperator txBucketOperator = dag.addOperator("txFilter", MerchantTransactionBucketOperator.class); + RangeKeyVal rangeOperator = dag.addOperator("rangePerMerchant", new RangeKeyVal<MerchantKey, Long>()); + SimpleMovingAverage<MerchantKey, Long> smaOperator = dag.addOperator("smaPerMerchant", SimpleMovingAverage.class); + TransactionStatsAggregator txStatsAggregator = dag.addOperator("txStatsAggregator", TransactionStatsAggregator.class); + AverageAlertingOperator avgAlertingOperator = dag.addOperator("avgAlerter", AverageAlertingOperator.class); + CreditCardAmountSamplerOperator ccSamplerOperator = dag.addOperator("amountFraudDetector", CreditCardAmountSamplerOperator.class); + HdfsStringOutputOperator hdfsOutputOperator = getHdfsOutputOperator("hdfsOutput", dag, "fraud"); + + MongoDBOutputOperator mongoTxStatsOperator = dag.addOperator("mongoTxStatsOutput", MongoDBOutputOperator.class); + MongoDBOutputOperator mongoBinAlertsOperator = dag.addOperator("mongoBinAlertsOutput", MongoDBOutputOperator.class); + MongoDBOutputOperator mongoCcAlertsOperator = dag.addOperator("mongoCcAlertsOutput", MongoDBOutputOperator.class); + MongoDBOutputOperator mongoAvgAlertsOperator = dag.addOperator("mongoAvgAlertsOutput", MongoDBOutputOperator.class); + + dag.addStream("userTxStream", userTxWsInput.outputPort, txInputHandler.userTxInputPort); + dag.addStream("transactions", txReceiver.txOutputPort, txBucketOperator.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL); + dag.addStream("txData", txReceiver.txDataOutputPort, hdfsOutputOperator.input); // dump all tx into Hdfs + dag.addStream("userTransactions", txInputHandler.txOutputPort, txBucketOperator.txUserInputPort); + dag.addStream("bankInfoData", txBucketOperator.binCountOutputPort, smsOperator.data); + dag.addStream("bankInfoCount", smsOperator.integerSum, binSampler.txCountInputPort); + dag.addStream("filteredTransactions", txBucketOperator.txOutputPort, rangeOperator.data, smaOperator.data, avgAlertingOperator.txInputPort); + + KeyPartitionCodec<MerchantKey, Long> txCodec = new KeyPartitionCodec<MerchantKey, Long>(); + dag.setInputPortAttribute(rangeOperator.data, Context.PortContext.STREAM_CODEC, txCodec); + dag.setInputPortAttribute(smaOperator.data, Context.PortContext.STREAM_CODEC, txCodec); + dag.setInputPortAttribute(avgAlertingOperator.txInputPort, Context.PortContext.STREAM_CODEC, txCodec); + + dag.addStream("creditCardData", txBucketOperator.ccAlertOutputPort, ccSamplerOperator.inputPort); + dag.addStream("txnSummaryData", txBucketOperator.summaryTxnOutputPort, txSummaryWsOutput.input); + dag.addStream("smaAlerts", smaOperator.doubleSMA, avgAlertingOperator.smaInputPort); + dag.addStream("binAlerts", binSampler.countAlertOutputPort, mongoBinAlertsOperator.inputPort); + dag.addStream("binAlertsNotification", binSampler.countAlertNotificationPort, binUserAlertwsOutput.input); + dag.addStream("rangeData", rangeOperator.range, txStatsAggregator.rangeInputPort); + dag.addStream("smaData", smaOperator.longSMA, txStatsAggregator.smaInputPort); + dag.addStream("txStatsOutput", txStatsAggregator.txDataOutputPort, mongoTxStatsOperator.inputPort); + dag.addStream("avgAlerts", avgAlertingOperator.avgAlertOutputPort, mongoAvgAlertsOperator.inputPort); + dag.addStream("avgAlertsNotification", avgAlertingOperator.avgAlertNotificationPort, avgUserAlertwsOutput.input); + dag.addStream("ccAlerts", ccSamplerOperator.ccAlertOutputPort, mongoCcAlertsOperator.inputPort); + dag.addStream("ccAlertsNotification", ccSamplerOperator.ccAlertNotificationPort, ccUserAlertWsOutput.input); + + } catch (Exception exc) { + DTThrowable.rethrow(exc); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java new file mode 100644 index 0000000..6aca64d --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertData.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +/** + * POJO to capture average alert data. + * + * @since 0.9.0 + */ +public class AverageAlertData +{ + public String merchantId; + public int terminalId; + public int zipCode; + public MerchantTransaction.MerchantType merchantType; + public long amount; + public double lastSmaValue; + public double change; + public boolean userGenerated; + public long time; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java new file mode 100644 index 0000000..1b1b64a --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/AverageAlertingOperator.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.validation.constraints.NotNull; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.examples.frauddetect.util.JsonUtils; +import org.apache.commons.lang.mutable.MutableDouble; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * Generate an alert if the current transaction amount received on tx input port for the given key is greater by n % + * than the SMA of the last application window as received on the SMA input port. + * + * @since 0.9.0 + */ +public class AverageAlertingOperator extends BaseOperator +{ + private static final Logger Log = LoggerFactory.getLogger(AverageAlertingOperator.class); + private final transient JsonFactory jsonFactory = new JsonFactory(); + private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); + private Map<MerchantKey, MutableDouble> lastSMAMap = new HashMap<MerchantKey, MutableDouble>(); + private Map<MerchantKey, MutableDouble> currentSMAMap = new HashMap<MerchantKey, MutableDouble>(); + private List<AverageAlertData> alerts = new ArrayList<AverageAlertData>(); + @NotNull + private int threshold; + private static final String brickMortarAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s at Terminal %d!"; + private static final String internetAlertMsg = "Transaction amount %d exceeded by %f (last SMA %f) for Merchant %s!"; + public final transient DefaultOutputPort<String> avgAlertOutputPort = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<Map<String, Object>> avgAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>(); + public final transient DefaultInputPort<KeyValPair<MerchantKey, Double>> smaInputPort = + new DefaultInputPort<KeyValPair<MerchantKey, Double>>() + { + @Override + public void process(KeyValPair<MerchantKey, Double> tuple) + { + MutableDouble currentSma = currentSMAMap.get(tuple.getKey()); + if (currentSma == null) { // first sma for the given key + double sma = tuple.getValue(); + currentSMAMap.put(tuple.getKey(), new MutableDouble(sma)); + //lastSMAMap.put(tuple.getKey(), new MutableDouble(sma)); + } else { // move the current SMA value to the last SMA Map + //lastSMAMap.get(tuple.getKey()).setValue(currentSma.getValue()); + currentSma.setValue(tuple.getValue()); // update the current SMA value + } + } + + }; + public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> txInputPort = + new DefaultInputPort<KeyValPair<MerchantKey, Long>>() + { + @Override + public void process(KeyValPair<MerchantKey, Long> tuple) + { + processTuple(tuple); + } + + }; + + private void processTuple(KeyValPair<MerchantKey, Long> tuple) + { + MerchantKey merchantKey = tuple.getKey(); + MutableDouble lastSma = lastSMAMap.get(tuple.getKey()); + long txValue = tuple.getValue(); + if (lastSma != null && txValue > lastSma.doubleValue()) { + double lastSmaValue = lastSma.doubleValue(); + double change = txValue - lastSmaValue; + if (change > threshold) { // generate an alert + AverageAlertData data = getOutputData(merchantKey, txValue, change, lastSmaValue); + alerts.add(data); + //if (userGenerated) { // if its user generated only the pass it to WebSocket + if (merchantKey.merchantType == MerchantTransaction.MerchantType.BRICK_AND_MORTAR) { + avgAlertNotificationPort.emit(getOutputData(data, String.format(brickMortarAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId, merchantKey.terminalId))); + } else { // its internet based + avgAlertNotificationPort.emit(getOutputData(data, String.format(internetAlertMsg, txValue, change, lastSmaValue, merchantKey.merchantId))); + + } + //} + } + } + } + + @Override + public void endWindow() + { + for (AverageAlertData data : alerts) { + try { + avgAlertOutputPort.emit(JsonUtils.toJson(data)); + } catch (IOException e) { + logger.warn("Exception while converting object to JSON", e); + } + } + + alerts.clear(); + + for (Map.Entry<MerchantKey, MutableDouble> entry : currentSMAMap.entrySet()) { + MerchantKey key = entry.getKey(); + MutableDouble currentSma = entry.getValue(); + MutableDouble lastSma = lastSMAMap.get(key); + if (lastSma == null) { + lastSma = new MutableDouble(currentSma.doubleValue()); + lastSMAMap.put(key, lastSma); + } else { + lastSma.setValue(currentSma.getValue()); + } + } + } + + private AverageAlertData getOutputData(MerchantKey key, long amount, double change, double lastSmaValue) + { + AverageAlertData data = new AverageAlertData(); + + data.merchantId = key.merchantId; + data.terminalId = key.terminalId == null ? 0 : key.terminalId; + data.zipCode = key.zipCode; + data.merchantType = key.merchantType; + data.amount = amount; + data.lastSmaValue = lastSmaValue; + data.change = change; + //data.userGenerated = userGenerated; + data.userGenerated = key.userGenerated; + data.time = System.currentTimeMillis(); + + return data; + } + + private Map<String, Object> getOutputData(AverageAlertData data, String msg) + { + Map<String, Object> output = new HashMap<String, Object>(); + output.put("message", msg); + output.put("alertType", "aboveAvg"); + output.put("userGenerated", "" + data.userGenerated); + output.put("alertData", data); + + try { + String str = mapper.writeValueAsString(output); + logger.debug("user generated tx alert: " + str); + } catch (Exception exc) { + //ignore + } + return output; + } + + public int getThreshold() + { + return threshold; + } + + public void setThreshold(int threshold) + { + this.threshold = threshold; + } + + private static final Logger logger = LoggerFactory.getLogger(AverageAlertingOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java new file mode 100644 index 0000000..28cd19a --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberAlertData.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +/** + * POJO to capture data related to alerts for repetitive bank id number data usage. + * + * @since 0.9.0 + */ +public class BankIdNumberAlertData +{ + public String merchantId; + public int terminalId; + public int zipCode; + public MerchantTransaction.MerchantType merchantType; + public String bankIdNum; + public int count; + public boolean userGenerated; + public long time; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java new file mode 100644 index 0000000..eea6b14 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberKey.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.Serializable; + +import com.datatorrent.lib.util.TimeBucketKey; + +/** + * Bank Id Number Key + * + * @since 0.9.0 + */ +public class BankIdNumberKey extends TimeBucketKey implements Serializable +{ + public String bankIdNum; + + public BankIdNumberKey() + { + } + + @Override + public int hashCode() + { + int key = 0; + key |= (1 << 1); + key |= (bankIdNum.hashCode()); + return super.hashCode() ^ key; + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof BankIdNumberKey)) { + return false; + } + return super.equals(obj) + && bankIdNum.equals(((BankIdNumberKey)obj).bankIdNum); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(super.toString()); + sb.append("|1:").append(bankIdNum); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java new file mode 100644 index 0000000..0731e3c --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/BankIdNumberSamplerOperator.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.examples.frauddetect.util.JsonUtils; + +import org.apache.commons.lang.mutable.MutableLong; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.lib.util.KeyValPair; + +/** + * Count the transactions for the underlying aggregation window if the same BIN is + * being used for more than defined number of transactions. Output the data as needed + * by Mongo output operator + * + * @since 0.9.0 + */ +public class BankIdNumberSamplerOperator extends BaseOperator +{ + private final transient JsonFactory jsonFactory = new JsonFactory(); + private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); + private int threshold; + private Map<MerchantKey, Map<String, BankIdNumData>> bankIdNumCountMap = new HashMap<MerchantKey, Map<String, BankIdNumData>>(); + private static final String ALERT_MSG = + "Potential fraudulent CC transactions (same bank id %s and merchant %s) total transactions: %d"; + /** + * Output the key-value pair for the BIN as key with the count as value. + */ + public final transient DefaultOutputPort<String> countAlertOutputPort = + new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<Map<String, Object>> countAlertNotificationPort = + new DefaultOutputPort<Map<String, Object>>(); + + public int getThreshold() + { + return threshold; + } + + public void setThreshold(int threshold) + { + this.threshold = threshold; + } + + /* + public final transient DefaultInputPort<KeyValPair<MerchantKey, String>> txInputPort = + new DefaultInputPort<KeyValPair<MerchantKey, String>>() + { + @Override + public void process(KeyValPair<MerchantKey, String> tuple) + { + processTuple(tuple); + } + + }; + + private void processTuple(KeyValPair<MerchantKey, String> tuple) + { + MerchantKey key = tuple.getKey(); + Map<String, BankIdNumData> map = bankIdNumCountMap.get(key); + if (map == null) { + map = new HashMap<String, BankIdNumData>(); + bankIdNumCountMap.put(key, map); + } + String bankIdNum = tuple.getValue(); + BankIdNumData bankIdNumData = map.get(bankIdNum); + if (bankIdNumData == null) { + bankIdNumData = new BankIdNumData(); + bankIdNumData.bankIdNum = bankIdNum; + map.put(bankIdNum, bankIdNumData); + } + bankIdNumData.count.increment(); + if (key.userGenerated) { + bankIdNumData.userGenerated = true; + } + } + */ + + public final transient DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> txCountInputPort = + new DefaultInputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>() + { + @Override + public void process(KeyValPair<KeyValPair<MerchantKey, String>, Integer> tuple) + { + processTuple(tuple.getKey(), tuple.getValue()); + } + + }; + + private void processTuple(KeyValPair<MerchantKey, String> tuple, Integer count) + { + MerchantKey key = tuple.getKey(); + Map<String, BankIdNumData> map = bankIdNumCountMap.get(key); + if (map == null) { + map = new HashMap<String, BankIdNumData>(); + bankIdNumCountMap.put(key, map); + } + String bankIdNum = tuple.getValue(); + BankIdNumData bankIdNumData = map.get(bankIdNum); + if (bankIdNumData == null) { + bankIdNumData = new BankIdNumData(); + bankIdNumData.bankIdNum = bankIdNum; + map.put(bankIdNum, bankIdNumData); + } + bankIdNumData.count.setValue(count); + if (key.userGenerated) { + bankIdNumData.userGenerated = true; + } + } + + /** + * Go through the BIN Counter map and check if any of the values for the BIN exceed the threshold. + * If yes, generate the alert on the output port. + */ + @Override + public void endWindow() + { + for (Map.Entry<MerchantKey, Map<String, BankIdNumData>> entry : bankIdNumCountMap.entrySet()) { + List<BankIdNumData> list = null; + MerchantKey key = entry.getKey(); + if (key.merchantType == MerchantTransaction.MerchantType.INTERNET) { + continue; + } + list = dataOutput(entry.getValue()); + if (list.size() > 0) { + for (BankIdNumData binData : list) { + BankIdNumberAlertData data = new BankIdNumberAlertData(); + data.merchantId = key.merchantId; + data.terminalId = key.terminalId == null ? 0 : key.terminalId; + data.zipCode = key.zipCode; + data.merchantType = key.merchantType; + data.bankIdNum = binData.bankIdNum; + data.count = binData.count.intValue(); + data.userGenerated = binData.userGenerated; + data.time = System.currentTimeMillis(); + try { + countAlertOutputPort.emit(JsonUtils.toJson(data)); + countAlertNotificationPort.emit(getOutputData(data)); + } catch (IOException e) { + logger.warn("Exception while converting object to JSON: ", e); + } + } + } + } + bankIdNumCountMap.clear(); + } + + private List<BankIdNumData> dataOutput(Map<String, BankIdNumData> map) + { + List<BankIdNumData> list = new ArrayList<BankIdNumData>(); + int count = 0; + for (Map.Entry<String, BankIdNumData> bankIdEntry : map.entrySet()) { + BankIdNumData data = bankIdEntry.getValue(); + if (data.count.intValue() > threshold) { + list.add(data); + } + } + return list; + } + + private Map<String, Object> getOutputData(BankIdNumberAlertData data) + { + Map<String, Object> output = new HashMap<String, Object>(); + output.put("message", String.format(ALERT_MSG, data.bankIdNum, data.merchantId, data.count)); + output.put("alertType", "sameBankId"); + output.put("userGenerated", "" + data.userGenerated); + output.put("alertData", data); + + try { + String str = mapper.writeValueAsString(output); + logger.debug("user generated tx alert: " + str); + } catch (Exception exc) { + //ignore + } + + return output; + } + + public static final class BankIdNumData + { + public String bankIdNum; + public MutableLong count = new MutableLong(); + public boolean userGenerated = false; + } + + private static final Logger logger = LoggerFactory.getLogger(BankIdNumberSamplerOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java new file mode 100644 index 0000000..f46f84a --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAlertData.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +/** + * POJO to capture data related to alerts for credit card number. + * + * @since 0.9.0 + */ +public class CreditCardAlertData +{ + public String merchantId; + public int terminalId; + public int zipCode; + public MerchantTransaction.MerchantType merchantType; + public String fullCcNum; + public long small; + public long large; + public double threshold; + public boolean userGenerated; + public long time; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java new file mode 100644 index 0000000..235e36e --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardAmountSamplerOperator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.map.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.examples.frauddetect.util.JsonUtils; +import org.apache.commons.lang.mutable.MutableLong; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * An operator to alert in case a transaction of a small lowAmount is followed by a transaction which is significantly larger for a given credit card number. + * This is done for each transaction. This also means that this happens for each individual credit card. + * It accepts merchant transaction object and for each CC listed in the transaction(s), checks for the transaction amounts. An alert is raised if the transaction + * lowAmount is significantly > the lowest amt in this window. + * + * @since 0.9.0 + */ +public class CreditCardAmountSamplerOperator extends BaseOperator +{ + private final transient JsonFactory jsonFactory = new JsonFactory(); + private final transient ObjectMapper mapper = new ObjectMapper(jsonFactory); + private static final Logger logger = LoggerFactory.getLogger(Application.class); + // Factor to be applied to existing lowAmount to flag potential alerts. + private double threshold = 9500; + private Map<String, CreditCardInfo> ccTxnMap = new HashMap<String, CreditCardInfo>(); + //private Map<String, MutableLong> ccQueryTxnMap = new HashMap<String, MutableLong>(); + private List<CreditCardAlertData> alerts = new ArrayList<CreditCardAlertData>(); + //private List<CreditCardAlertData> userAlerts = new ArrayList<CreditCardAlertData>(); + private static final String ALERT_MSG = + "Potential fraudulent CC transactions (small one USD %d followed by large USD %d) performed using credit card: %s"; + public final transient DefaultOutputPort<String> ccAlertOutputPort = new DefaultOutputPort<String>(); + /* + public final transient DefaultOutputPort<Map<String, Object>> ccUserAlertOutputPort = new DefaultOutputPort<Map<String, Object>>(); + */ + public final transient DefaultOutputPort<Map<String, Object>> ccAlertNotificationPort = new DefaultOutputPort<Map<String, Object>>(); + + public double getThreshold() + { + return threshold; + } + + public void setThreshold(double threshold) + { + this.threshold = threshold; + } + + private void processTuple(KeyValPair<MerchantKey, CreditCardData> tuple, Map<String, CreditCardInfo> txMap) + { + String fullCcNum = tuple.getValue().fullCcNum; + long ccAmount = tuple.getValue().amount; + MerchantKey key = tuple.getKey(); + + CreditCardInfo cardInfo = txMap.get(fullCcNum); + + if (cardInfo != null) { + long currentSmallValue = cardInfo.lowAmount.longValue(); + if (ccAmount < currentSmallValue) { + cardInfo.lowAmount.setValue(ccAmount); + cardInfo.time = key.time; + } else if (ccAmount > (currentSmallValue + threshold)) { + // If the transaction lowAmount is > 70% of the min. lowAmount, send an alert. + + CreditCardAlertData data = new CreditCardAlertData(); + + data.merchantId = key.merchantId; + data.terminalId = key.terminalId == null ? 0 : key.terminalId; + data.zipCode = key.zipCode; + data.merchantType = key.merchantType; + data.fullCcNum = fullCcNum; + data.small = currentSmallValue; + data.large = ccAmount; + data.threshold = threshold; + data.userGenerated = key.userGenerated; + data.time = System.currentTimeMillis(); + + alerts.add(data); + + /* + if (userGenerated){ + userAlerts.add(data); + } + */ + ccAlertNotificationPort.emit(getOutputData(data)); + + // Any high value transaction after a low value transaction with difference greater than threshold + // will trigger the alert. Not resetting the low value also helps in a system generated transaction + // alert not resetting the low value from a user generated transaction + //txMap.remove(fullCcNum); + } + } else { + cardInfo = new CreditCardInfo(); + cardInfo.lowAmount.setValue(ccAmount); + cardInfo.time = key.time; + txMap.put(fullCcNum, cardInfo); + } + } + + public transient DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>> inputPort = + new DefaultInputPort<KeyValPair<MerchantKey, CreditCardData>>() + { + // + // This function checks if a CC entry exists. + // If so, it checks whether the current transaction is for an lowAmount lesser than the one stored in the hashmap. If so, this becomes the min. transaction lowAmount. + // If the lowAmount is > 70% of the existing lowAmount in the hash map, raise an alert. + // + @Override + public void process(KeyValPair<MerchantKey, CreditCardData> tuple) + { + + processTuple(tuple, ccTxnMap); + + } + + }; + + @Override + public void endWindow() + { + + for (CreditCardAlertData data : alerts) { + try { + ccAlertOutputPort.emit(JsonUtils.toJson(data)); + } catch (IOException e) { + logger.warn("Exception while converting object to JSON", e); + } + } + + //for (CreditCardAlertData data: userAlerts) { + /*for (CreditCardAlertData data: alerts) { + ccAlertNotificationPort.emit(getOutputData(data)); + }*/ + + long ctime = System.currentTimeMillis(); + Iterator<Map.Entry<String, CreditCardInfo>> iterator = ccTxnMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, CreditCardInfo> entry = iterator.next(); + long time = entry.getValue().time; + if ((ctime - time) > 60000) { + iterator.remove(); + } + } + + //ccTxnMap.clear(); + alerts.clear(); + + //ccQueryTxnMap.clear(); + //userAlerts.clear(); + } + + private static class CreditCardInfo + { + MutableLong lowAmount = new MutableLong(); + Long time; + } + + private Map<String, Object> getOutputData(CreditCardAlertData data) + { + Map<String, Object> output = new HashMap<String, Object>(); + output.put("message", String.format(ALERT_MSG, data.small, data.large, data.fullCcNum)); + output.put("alertType", "smallThenLarge"); + output.put("userGenerated", "" + data.userGenerated); + output.put("alertData", data); + + try { + String str = mapper.writeValueAsString(output); + logger.debug("Alert generated: " + str + " userGenerated: " + data.userGenerated); + } catch (Exception exc) { + //ignore + } + + return output; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java new file mode 100644 index 0000000..7c667d6 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/CreditCardData.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +/** + * Credit Card Data + * + * @since 0.9.0 + */ +public class CreditCardData +{ + public String fullCcNum; + public long amount; + + public CreditCardData() + { + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java new file mode 100644 index 0000000..d73c693 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantKey.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + + +import java.io.Serializable; + +/** + * A time-based key for merchant data. + * + * @since 0.9.0 + */ +public class MerchantKey implements Serializable +{ + public String merchantId; + public Integer terminalId; + public Integer zipCode; + public String country; + public MerchantTransaction.MerchantType merchantType; + public Long time; + public boolean userGenerated; + + public MerchantKey() + { + } + + @Override + public int hashCode() + { + int key = 0; + if (merchantId != null) { + key |= (1 << 1); + key |= (merchantId.hashCode()); + } + if (terminalId != null) { + key |= (1 << 2); + key |= (terminalId << 2); + } + if (zipCode != null) { + key |= (1 << 3); + key |= (zipCode << 3); + } + if (country != null) { + key |= (1 << 4); + key |= (country.hashCode()); + } + if (merchantType != null) { + key |= (1 << 5); + key |= (merchantType.hashCode()); + } + return key; + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof MerchantKey)) { + return false; + } + MerchantKey mkey = (MerchantKey)obj; + return checkStringEqual(this.merchantId, mkey.merchantId) + && checkIntEqual(this.terminalId, mkey.terminalId) + && checkIntEqual(this.zipCode, mkey.zipCode) + && checkStringEqual(this.country, mkey.country) + && checkIntEqual(this.merchantType.ordinal(), mkey.merchantType.ordinal()); + } + + private boolean checkIntEqual(Integer a, Integer b) + { + if ((a == null) && (b == null)) { + return true; + } + if ((a != null) && (b != null) && a.intValue() == b.intValue()) { + return true; + } + return false; + } + + private boolean checkStringEqual(String a, String b) + { + if ((a == null) && (b == null)) { + return true; + } + if ((a != null) && a.equals(b)) { + return true; + } + return false; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + if (merchantId != null) { + sb.append("|1:").append(merchantId); + } + if (terminalId != null) { + sb.append("|2:").append(terminalId); + } + if (zipCode != null) { + sb.append("|3:").append(zipCode); + } + if (country != null) { + sb.append("|4:").append(country); + } + if (merchantType != null) { + sb.append("|5:").append(merchantType); + } + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java new file mode 100644 index 0000000..e6a4680 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantQueryInputHandler.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.util.Map; + +/** + * Common utility class that can be used by all other operators to handle user input + * captured from the Web socket input port. + * + * @since 0.9.0 + */ +public class MerchantQueryInputHandler +{ + public static final String KEY_DATA = "data"; + public static final String KEY_MERCHANT_ID = "merchantId"; + public static final String KEY_TERMINAL_ID = "terminalId"; + public static final String KEY_ZIP_CODE = "zipCode"; + + public static MerchantKey process(Map<String, Object> tuple) + { + String merchantId = null; + Integer terminalId = null; + Integer zipCode = null; + + // ignoring other top-level attributes. + Map<String, Object> data = (Map<String, Object>)tuple.get(KEY_DATA); + if (data.get(KEY_MERCHANT_ID) != null) { + merchantId = (String)data.get(KEY_MERCHANT_ID); + } + if (data.get(KEY_TERMINAL_ID) != null) { + terminalId = (Integer)data.get(KEY_TERMINAL_ID); + } + if (data.get(KEY_ZIP_CODE) != null) { + zipCode = (Integer)data.get(KEY_ZIP_CODE); + } + + MerchantKey key = new MerchantKey(); + key.merchantId = merchantId; + key.terminalId = terminalId; + key.zipCode = zipCode; + key.country = "USA"; + if (merchantId != null) { + key.merchantType = key.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[2]) + || key.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[3]) + ? MerchantTransaction.MerchantType.INTERNET + : MerchantTransaction.MerchantType.BRICK_AND_MORTAR; + } + return key; + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java new file mode 100644 index 0000000..a722492 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransaction.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.Serializable; + +/** + * POJO for BIN Alert related data. + * + * @since 0.9.0 + */ +public class MerchantTransaction implements Serializable +{ + public enum MerchantType + { + UNDEFINED, BRICK_AND_MORTAR, INTERNET + } + + public enum TransactionType + { + UNDEFINED, POS + } + + public String ccNum; + public String bankIdNum; + public String fullCcNum; + public Long amount; + public String merchantId; + public Integer terminalId; + public Integer zipCode; + public String country; + public MerchantType merchantType = MerchantType.UNDEFINED; + public TransactionType transactionType = TransactionType.UNDEFINED; + public Long time; + public boolean userGenerated; + + public MerchantTransaction() + { + } + + @Override + public int hashCode() + { + int key = 0; + if (ccNum != null) { + key |= (1 << 1); + key |= (ccNum.hashCode()); + } + if (bankIdNum != null) { + key |= (1 << 2); + key |= (bankIdNum.hashCode()); + } + if (amount != null) { + key |= (1 << 6); + key |= (amount << 4); + } + if (merchantId != null) { + key |= (1 << 3); + key |= (merchantId.hashCode()); + } + if (terminalId != null) { + key |= (1 << 4); + key |= (terminalId << 2); + } + if (zipCode != null) { + key |= (1 << 5); + key |= (zipCode << 3); + } + if (country != null) { + key |= (1 << 7); + key |= (country.hashCode()); + } + if (merchantType != null) { + key |= (1 << 8); + key |= (merchantType.hashCode()); + } + if (transactionType != null) { + key |= (1 << 9); + key |= (transactionType.hashCode()); + } + if (fullCcNum != null) { + key |= (1 << 10); + key |= (fullCcNum.hashCode()); + } + if (time != null) { + key |= (1 << 11); + key |= (time << 2); + } + + return key; + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof MerchantTransaction)) { + return false; + } + MerchantTransaction mtx = (MerchantTransaction)obj; + return checkStringEqual(this.ccNum, mtx.ccNum) + && checkStringEqual(this.bankIdNum, mtx.bankIdNum) + && checkLongEqual(this.amount, mtx.amount) + && checkStringEqual(this.merchantId, mtx.merchantId) + && checkIntEqual(this.terminalId, mtx.terminalId) + && checkIntEqual(this.zipCode, mtx.zipCode) + && checkStringEqual(this.country, mtx.country) + && checkIntEqual(this.merchantType.ordinal(), mtx.merchantType.ordinal()) + && checkIntEqual(this.transactionType.ordinal(), mtx.transactionType.ordinal()) + && checkStringEqual(this.fullCcNum, mtx.fullCcNum) + && checkLongEqual(this.time, mtx.time); + } + + private boolean checkIntEqual(Integer a, Integer b) + { + if ((a == null) && (b == null)) { + return true; + } + if ((a != null) && (b != null) && a.intValue() == b.intValue()) { + return true; + } + return false; + } + + private boolean checkLongEqual(Long a, Long b) + { + if ((a == null) && (b == null)) { + return true; + } + if ((a != null) && (b != null) && a.longValue() == b.longValue()) { + return true; + } + return false; + } + + private boolean checkStringEqual(String a, String b) + { + if ((a == null) && (b == null)) { + return true; + } + if ((a != null) && a.equals(b)) { + return true; + } + return false; + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + if (ccNum != null) { + sb.append("|0:").append(ccNum); + } + if (bankIdNum != null) { + sb.append("|1:").append(bankIdNum); + } + if (fullCcNum != null) { + sb.append("|2:").append(fullCcNum); + } + if (amount != null) { + sb.append("|3:").append(amount); + } + if (merchantId != null) { + sb.append("|4:").append(merchantId); + } + if (terminalId != null) { + sb.append("|5:").append(terminalId); + } + if (zipCode != null) { + sb.append("|6:").append(zipCode); + } + if (country != null) { + sb.append("|7:").append(country); + } + if (merchantType != null) { + sb.append("|8:").append(merchantType); + } + if (transactionType != null) { + sb.append("|9:").append(transactionType); + } + if (time != null) { + sb.append("|10:").append(time); + } + return sb.toString(); + } + +}
