http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java new file mode 100644 index 0000000..4a57207 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionBucketOperator.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.text.DecimalFormat; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableDouble; +import org.apache.commons.lang.mutable.MutableLong; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * A bucket-like operator to accept merchant transaction object and dissipate the + * transaction amount to the further downstream operator for calculating min, max and std-deviation. + * + * @since 0.9.0 + */ +public class MerchantTransactionBucketOperator extends BaseOperator +{ + private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionBucketOperator.class); + /* + public final transient DefaultOutputPort<KeyValPair<MerchantKey, String>> binOutputPort = + new DefaultOutputPort<KeyValPair<MerchantKey, String>>(); + */ + public final transient DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> binCountOutputPort = + new DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>>(); + public final transient DefaultOutputPort<KeyValPair<MerchantKey, Long>> txOutputPort = + new DefaultOutputPort<KeyValPair<MerchantKey, Long>>(); + public final transient DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> ccAlertOutputPort = + new DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>>(); + public final transient DefaultOutputPort<Map<String, Object>> summaryTxnOutputPort = + new DefaultOutputPort<Map<String, Object>>(); + private MutableLong totalTxns = new MutableLong(0); + private MutableLong txnsInLastSecond = new MutableLong(0); + private MutableDouble amtInLastSecond = new MutableDouble(0); + private transient DecimalFormat amtFormatter = new DecimalFormat("#.##"); + public transient DefaultInputPort<MerchantTransaction> inputPort = new DefaultInputPort<MerchantTransaction>() + { + @Override + public void process(MerchantTransaction tuple) + { + processTuple(tuple); + } + + }; + public transient DefaultInputPort<MerchantTransaction> txUserInputPort = new DefaultInputPort<MerchantTransaction>() + { + @Override + public void process(MerchantTransaction tuple) + { + processTuple(tuple); + } + + }; + + public void endWindow() + { + Map<String, Object> summary = new HashMap<String, Object>(); + double avg; + if (txnsInLastSecond.longValue() == 0) { + avg = 0; + } else { + avg = amtInLastSecond.doubleValue() / txnsInLastSecond.longValue(); + } + summary.put("totalTxns", totalTxns); + summary.put("txnsInLastSecond", txnsInLastSecond); + summary.put("amtInLastSecond", amtFormatter.format(amtInLastSecond)); + summary.put("avgAmtInLastSecond", amtFormatter.format(avg)); + summaryTxnOutputPort.emit(summary); + txnsInLastSecond.setValue(0); + amtInLastSecond.setValue(0); + } + + private void processTuple(MerchantTransaction tuple) + { + emitBankIdNumTuple(tuple, binCountOutputPort); + emitMerchantKeyTuple(tuple, txOutputPort); + emitCreditCardKeyTuple(tuple, ccAlertOutputPort); + totalTxns.increment(); + txnsInLastSecond.increment(); + amtInLastSecond.add(tuple.amount); + } + + private void emitMerchantKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, Long>> outputPort) + { + MerchantKey key = getMerchantKey(tuple); + KeyValPair<MerchantKey, Long> keyValPair = new KeyValPair<MerchantKey, Long>(key, tuple.amount); + outputPort.emit(keyValPair); + } + + //private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, String>> outputPort) + private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> outputPort) + { + MerchantKey key = getMerchantKey(tuple); + KeyValPair<MerchantKey, String> keyValPair = new KeyValPair<MerchantKey, String>(key, tuple.bankIdNum); + outputPort.emit(new KeyValPair<KeyValPair<MerchantKey, String>, Integer>(keyValPair, 1)); + } + + private void emitCreditCardKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> outputPort) + { + MerchantKey key = getMerchantKey(tuple); + + CreditCardData data = new CreditCardData(); + data.fullCcNum = tuple.fullCcNum; + data.amount = tuple.amount; + + KeyValPair<MerchantKey, CreditCardData> keyValPair = new KeyValPair<MerchantKey, CreditCardData>(key, data); + outputPort.emit(keyValPair); + } + + private MerchantKey getMerchantKey(MerchantTransaction tuple) + { + MerchantKey key = new MerchantKey(); + key.merchantId = tuple.merchantId; + key.terminalId = tuple.terminalId; + key.zipCode = tuple.zipCode; + key.country = tuple.country; + key.merchantType = tuple.merchantType; + key.userGenerated = tuple.userGenerated; + key.time = tuple.time; + return key; + } + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java new file mode 100644 index 0000000..2327344 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionGenerator.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.examples.frauddetect.util.JsonUtils; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; + +import com.datatorrent.common.util.BaseOperator; + +/** + * Information tuple generator with randomness. + * + * @since 0.9.0 + */ +public class MerchantTransactionGenerator extends BaseOperator implements InputOperator +{ + private final Random randomNum = new Random(); + public static final int[] zipCodes = {94086, 94087, 94088, 94089, 94090, 94091, 94092, 94093}; + public static final String[] merchantIds = {"Wal-Mart", "Target", "Amazon", "Apple", "Sears", "Macys", "JCPenny", "Levis"}; +// public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444", "5555 5555 5555", "6666 6666 6666", "7777 7777 7777", "8888 8888 8888"}; +// public static final String ccNums[] = { "0001", "0002", "0003", "0004", "0005", "0006", "0007", "0008"}; +// public static final String bankIdNums[] = { "1111 1111 1111", "2222 2222 2222", "3333 3333 3333", "4444 4444 4444"}; +// public static final String ccNums[] = { "0001", "0002", "0003", "0004"}; +// public static final int zipCodes[] = { 94086, 94087, 94088, 94089, 94090}; +// public static final String merchantIds[] = { "Wal-Mart", "Target", "Amazon", "Apple"}; +// private int bankIdNumMin = 0; +// private int bankIdNumMax = bankIdNums.length - 1; +// private int ccMin = 0; +// private int ccMax = ccNums.length - 1; + private int amountMin = 1; + private int amountMax = 400; + private int merchantIdMin = 0; + private int merchantIdMax = merchantIds.length - 1; + private int terminalIdMin = 1; + private int terminalIdMax = 8; + private int zipMin = 0; + private int zipMax = zipCodes.length - 1; + private int tupleBlastSize = 2000; + private boolean stopGeneration = false; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + } + + public transient DefaultOutputPort<MerchantTransaction> txOutputPort = + new DefaultOutputPort<MerchantTransaction>(); + public transient DefaultOutputPort<String> txDataOutputPort = + new DefaultOutputPort<String>(); + + @Override + public void emitTuples() + { + int count = 0; + List<MerchantTransaction> txList = new ArrayList(); + + while (!stopGeneration && count < getTupleBlastSize()) { + + String bankIdNum = genBankIdNum(); + String ccNum = genCC(); + int merchant = genMerchantId(); + int terminal = genTerminalId(); + int zip = genZip(); + + long amount = genAmount(); + +// int bankIdNum = 1; +// int ccNum = 2; +// long amount = 5000; +// int merchant = 3; +// int terminal = 4; +// int zip = 0; + + MerchantTransaction tx = new MerchantTransaction(); + tx.bankIdNum = bankIdNum; + tx.ccNum = ccNum; + tx.fullCcNum = tx.bankIdNum + " " + tx.ccNum; + tx.amount = amount; + tx.merchantId = merchantIds[merchant]; + + // its INTERNET merchant + tx.merchantType = merchant == 2 || merchant == 3 + ? MerchantTransaction.MerchantType.INTERNET + : MerchantTransaction.MerchantType.BRICK_AND_MORTAR; + + tx.transactionType = MerchantTransaction.TransactionType.POS; + + // set terminal only for a BRICK_AND_MORTAR merchant + if (merchant != 2 && merchant != 3) { + tx.terminalId = terminal; + } + tx.zipCode = zipCodes[zip]; + tx.country = "USA"; + tx.time = System.currentTimeMillis(); + + tx.userGenerated = false; + + txOutputPort.emit(tx); + + txList.add(tx); + + count++; + } + for (MerchantTransaction txData : txList) { + try { + txDataOutputPort.emit(JsonUtils.toJson(txData)); + } catch (IOException e) { + logger.warn("Exception while converting object to JSON", e); + } + } + txList.clear(); + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + public String genBankIdNum() + { + // Bank ID will be between 1000 0000 and 3500 0000 (25 BINs) + int base = randomNum.nextInt(100) + 100; + return base + "0 0000"; + } + + public String genCC() + { + // CC will be 1000 0000 to 1400 0000 (400,000 cards per BIN) + int base = (randomNum.nextInt(100000) + 10000000); + String baseString = Integer.toString(base); + return baseString.substring(0, 4) + " " + baseString.substring(4); + } + + public int genAmount() + { + int lowRange = 50; + int range = amountMax - amountMin + randomNum.nextInt(lowRange); + return amountMin + randomNum.nextInt(range); + } + + public int genMerchantId() + { + int range = merchantIdMax - merchantIdMin + 1; + return merchantIdMin + randomNum.nextInt(range); + } + + public int genTerminalId() + { + int range = terminalIdMax - terminalIdMin + 1; + return terminalIdMin + randomNum.nextInt(range); + } + + public int genZip() + { + int range = zipMax - zipMin + 1; + return zipMin + randomNum.nextInt(range); + } + + public void setStopGeneration(boolean stopGeneration) + { + this.stopGeneration = stopGeneration; + } + + /** + * @return the tupleBlastSize + */ + public int getTupleBlastSize() + { + return tupleBlastSize; + } + + /** + * @param tupleBlastSize the tupleBlastSize to set + */ + public void setTupleBlastSize(int tupleBlastSize) + { + this.tupleBlastSize = tupleBlastSize; + } + + private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionGenerator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java new file mode 100644 index 0000000..6dd9c2f --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/MerchantTransactionInputHandler.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * Common utility class that can be used by all other operators to handle user input + * captured from the Web socket input port. + * + * @since 0.9.0 + */ +public class MerchantTransactionInputHandler extends BaseOperator +{ + public static final String KEY_BANK_ID_NUMBER = "bankIdNum"; // first 12 digits + public static final String KEY_CREDIT_CARD_NUMBER = "ccNum"; // last 4 digits + public static final String KEY_MERCHANT_ID = "merchantId"; + public static final String KEY_TERMINAL_ID = "terminalId"; + public static final String KEY_ZIP_CODE = "zipCode"; + public static final String KEY_AMOUNT = "amount"; + public transient DefaultOutputPort<MerchantTransaction> txOutputPort = + new DefaultOutputPort<MerchantTransaction>(); + public transient DefaultInputPort<Map<String, String>> userTxInputPort = new DefaultInputPort<Map<String, String>>() + { + @Override + public void process(Map<String, String> tuple) + { + try { + txOutputPort.emit(processInput(tuple)); + } catch (Exception exc) { + logger.error("Exception while handling the input", exc); + } + } + + }; + + public MerchantTransaction processInput(Map<String, String> tuple) + { + String bankIdNum = null; + String ccNum = null; + String merchantId = null; + Integer terminalId = null; + Integer zipCode = null; + Long amount = null; + for (Map.Entry<String, String> e : tuple.entrySet()) { + if (e.getKey().equals(KEY_BANK_ID_NUMBER)) { + bankIdNum = e.getValue(); + } + if (e.getKey().equals(KEY_CREDIT_CARD_NUMBER)) { + ccNum = e.getValue(); + } + if (e.getKey().equals(KEY_MERCHANT_ID)) { + merchantId = e.getValue(); + } + if (e.getKey().equals(KEY_TERMINAL_ID)) { + terminalId = new Integer(e.getValue()); + } + if (e.getKey().equals(KEY_ZIP_CODE)) { + zipCode = new Integer(e.getValue()); + } + if (e.getKey().equals(KEY_AMOUNT)) { + amount = new Long(e.getValue()); + } + } + + if (bankIdNum == null || ccNum == null || merchantId == null || terminalId == null || zipCode == null || amount == null) { + throw new IllegalArgumentException("Missing required input!"); + } + + MerchantTransaction tx = new MerchantTransaction(); + tx.bankIdNum = bankIdNum; + tx.ccNum = ccNum; + tx.fullCcNum = bankIdNum + " " + ccNum; + tx.merchantId = merchantId; + tx.terminalId = terminalId; + tx.zipCode = zipCode; + tx.country = "USA"; + tx.amount = amount; + tx.merchantType = tx.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[2]) + || tx.merchantId.equalsIgnoreCase(MerchantTransactionGenerator.merchantIds[3]) + ? MerchantTransaction.MerchantType.INTERNET + : MerchantTransaction.MerchantType.BRICK_AND_MORTAR; + tx.transactionType = MerchantTransaction.TransactionType.POS; + + tx.userGenerated = true; + tx.time = System.currentTimeMillis(); + + return tx; + + } + + private static final Logger logger = LoggerFactory.getLogger(MerchantTransactionInputHandler.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java new file mode 100644 index 0000000..dc2c942 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumKeyVal.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.util.ArrayList; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; + +import com.datatorrent.lib.multiwindow.AbstractSlidingWindowKeyVal; +import com.datatorrent.lib.util.KeyValPair; + + +/** + * Sliding window sum operator + * + * @since 0.9.0 + */ +public class SlidingWindowSumKeyVal<K, V extends Number> extends AbstractSlidingWindowKeyVal<K, V, SlidingWindowSumObject> +{ + + /** + * Output port to emit simple moving average (SMA) of last N window as Double. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleSum = new DefaultOutputPort<KeyValPair<K, Double>>(); + /** + * Output port to emit simple moving average (SMA) of last N window as Float. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Float>> floatSum = new DefaultOutputPort<KeyValPair<K, Float>>(); + /** + * Output port to emit simple moving average (SMA) of last N window as Long. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Long>> longSum = new DefaultOutputPort<KeyValPair<K, Long>>(); + /** + * Output port to emit simple moving average (SMA) of last N window as + * Integer. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Integer>> integerSum = new DefaultOutputPort<KeyValPair<K, Integer>>(); + + + @Override + public void processDataTuple(KeyValPair<K, V> tuple) + { + K key = tuple.getKey(); + ArrayList<SlidingWindowSumObject> stateList = buffer.get(key); + if (stateList == null) { + stateList = new ArrayList<SlidingWindowSumObject>(); + for (int i = 0; i < windowSize; ++i) { + stateList.add(new SlidingWindowSumObject()); + } + buffer.put(key, stateList); + } + SlidingWindowSumObject state = stateList.get(currentstate); + state.add(tuple.getValue()); + } + + @Override + public void emitTuple(K key, ArrayList<SlidingWindowSumObject> obj) + { + double sum = 0; + for (int i = 0; i < obj.size(); ++i) { + SlidingWindowSumObject state = obj.get(i); + sum += state.getSum(); + } + if (doubleSum.isConnected()) { + doubleSum.emit(new KeyValPair<K, Double>(key, sum)); + } + if (floatSum.isConnected()) { + floatSum.emit(new KeyValPair<K, Float>(key, (float)sum)); + } + if (longSum.isConnected()) { + longSum.emit(new KeyValPair<K, Long>(key, (long)sum)); + } + if (integerSum.isConnected()) { + integerSum.emit(new KeyValPair<K, Integer>(key, (int)sum)); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java new file mode 100644 index 0000000..fc5f95d --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/SlidingWindowSumObject.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + + +import org.apache.commons.lang.mutable.MutableDouble; + +import com.datatorrent.lib.multiwindow.SimpleMovingAverageObject; + +/** + * State object for sliding window sum + * + * @since 0.9.0 + */ +public class SlidingWindowSumObject extends SimpleMovingAverageObject +{ + + MutableDouble sum = new MutableDouble(0); + + public void add(Number n) + { + sum.add(n); + } + + @Override + public double getSum() + { + return sum.doubleValue(); + } + + @Override + public void clear() + { + sum.setValue(0); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java new file mode 100644 index 0000000..71f3035 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsAggregator.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.examples.frauddetect.util.JsonUtils; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.HighLow; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Operator to aggregate the min, max, sma, std-dev and variance for the given key. + * + * @since 0.9.0 + */ +public class TransactionStatsAggregator extends BaseOperator +{ + public Map<MerchantKey, TransactionStatsData> aggrgateMap = + new HashMap<MerchantKey, TransactionStatsData>(); + public final transient DefaultOutputPort<String> txDataOutputPort = new DefaultOutputPort<String>(); + public final transient DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>> rangeInputPort = + new DefaultInputPort<KeyValPair<MerchantKey, HighLow<Long>>>() + { + @Override + public void process(KeyValPair<MerchantKey, HighLow<Long>> tuple) + { + TransactionStatsData data = getDataObjectFromMap(tuple.getKey()); + // HighLow is not currently typed, casting till it is fixed + data.min = tuple.getValue().getLow(); + data.max = tuple.getValue().getHigh(); + } + + }; + public final transient DefaultInputPort<KeyValPair<MerchantKey, Long>> smaInputPort = + new DefaultInputPort<KeyValPair<MerchantKey, Long>>() + { + @Override + public void process(KeyValPair<MerchantKey, Long> tuple) + { + TransactionStatsData data = getDataObjectFromMap(tuple.getKey()); + data.sma = tuple.getValue(); + } + + }; + + private TransactionStatsData getDataObjectFromMap(MerchantKey key) + { + TransactionStatsData data = aggrgateMap.get(key); + if (data == null) { + data = new TransactionStatsData(); + data.time = System.currentTimeMillis(); + data.merchantId = key.merchantId; + data.terminalId = key.terminalId == null ? 0 : key.terminalId; + data.zipCode = key.zipCode; + data.merchantType = key.merchantType; + aggrgateMap.put(key, data); + } + return data; + } + + @Override + public void endWindow() + { + for (Map.Entry<MerchantKey, TransactionStatsData> entry : aggrgateMap.entrySet()) { + try { + txDataOutputPort.emit(JsonUtils.toJson(entry.getValue())); + } catch (IOException e) { + logger.warn("Exception while converting object to JSON", e); + } + } + aggrgateMap.clear(); + } + + private static final Logger logger = LoggerFactory.getLogger(TransactionStatsAggregator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java new file mode 100644 index 0000000..f0b2b86 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/TransactionStatsData.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +/** + * POJO to capture transaction data related to min, max, sma, std-dev, variance. + * + * @since 0.9.0 + */ +public class TransactionStatsData +{ + public String merchantId; + public int terminalId; + public int zipCode; + public MerchantTransaction.MerchantType merchantType; + public long min; + public long max; + public double sma; + public long time; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java new file mode 100644 index 0000000..89c4bcd --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/HdfsStringOutputOperator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect.operator; + +import java.io.File; + +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Adapter for writing Strings to HDFS + * <p> + * Serializes tuples into a HDFS file.<br/> + * </p> + * + * @since 0.9.4 + */ +public class HdfsStringOutputOperator extends AbstractFileOutputOperator<String> +{ + private transient String outputFileName; + private transient String contextId; + private int index = 0; + + public HdfsStringOutputOperator() + { + setMaxLength(1024 * 1024); + } + + @Override + public void setup(OperatorContext context) + { + contextId = context.getValue(DAGContext.APPLICATION_NAME); + outputFileName = File.separator + contextId + + File.separator + "transactions.out.part"; + super.setup(context); + } + + @Override + public byte[] getBytesForTuple(String t) + { + return t.getBytes(); + } + + @Override + protected String getFileName(String tuple) + { + return outputFileName; + } + + @Override + public String getPartFileName(String fileName, int part) + { + return fileName + part; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java new file mode 100644 index 0000000..e059c03 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/operator/MongoDBOutputOperator.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect.operator; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; +import com.mongodb.WriteConcern; +import com.mongodb.WriteResult; +import com.mongodb.util.JSON; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; + + +/** + * Operator to write data into MongoDB + * + * @since 0.9.0 + */ +public class MongoDBOutputOperator extends BaseOperator +{ + @NotNull + protected String hostName; + @NotNull + protected String dataBase; + @NotNull + protected String collection; + + protected WriteConcern writeConcern = WriteConcern.ACKNOWLEDGED; + + protected String userName; + protected String passWord; + + protected transient MongoClient mongoClient; + protected transient DB db; + protected transient DBCollection dbCollection; + + protected List<DBObject> dataList = new ArrayList<DBObject>(); + + public MongoDBOutputOperator() + { + } + + /** + * Take the JSON formatted string and convert it to DBObject + */ + public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() + { + @Override + public void process(String tuple) + { + dataList.add((DBObject)JSON.parse(tuple)); + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + try { + mongoClient = new MongoClient(hostName); + db = mongoClient.getDB(dataBase); + if (userName != null && passWord != null) { + if (!db.authenticate(userName, passWord.toCharArray())) { + throw new IllegalArgumentException("MongoDB authentication failed. Illegal username and password for MongoDB!!"); + } + } + dbCollection = db.getCollection(collection); + } catch (UnknownHostException ex) { + logger.debug(ex.toString()); + } + } + + @Override + public void beginWindow(long windowId) + { + // nothing + } + + @Override + public void endWindow() + { + logger.debug("mongo datalist size: " + dataList.size()); + if (dataList.size() > 0) { + WriteResult result = dbCollection.insert(dataList, writeConcern); + logger.debug("Result for MongoDB insert: " + result); + dataList.clear(); + } + } + + @Override + public void teardown() + { + if (mongoClient != null) { + mongoClient.close(); + } + } + + public String getHostName() + { + return hostName; + } + + public void setHostName(String hostName) + { + this.hostName = hostName; + } + + public String getDataBase() + { + return dataBase; + } + + public void setDataBase(String dataBase) + { + this.dataBase = dataBase; + } + + public String getCollection() + { + return collection; + } + + public void setCollection(String collection) + { + this.collection = collection; + } + + public String getUserName() + { + return userName; + } + + public void setUserName(String userName) + { + this.userName = userName; + } + + public String getPassWord() + { + return passWord; + } + + public void setPassWord(String passWord) + { + this.passWord = passWord; + } + + public WriteConcern getWriteConcern() + { + return writeConcern; + } + + public void setWriteConcern(WriteConcern writeConcern) + { + this.writeConcern = writeConcern; + } + + private static final Logger logger = LoggerFactory.getLogger(MongoDBOutputOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java new file mode 100644 index 0000000..932ef64 --- /dev/null +++ b/examples/frauddetect/src/main/java/org/apache/apex/examples/frauddetect/util/JsonUtils.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect.util; + +import java.io.IOException; + +import org.codehaus.jackson.map.ObjectMapper; + +/** + * Utility class to deal with JSON and Object + * + * @since 0.9.0 + */ +public class JsonUtils +{ + private static final ObjectMapper mapper = new ObjectMapper(); + + public static String toJson(Object obj) throws IOException + { + return mapper.writeValueAsString(obj); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/main/resources/META-INF/properties.xml b/examples/frauddetect/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..7a42ac4 --- /dev/null +++ b/examples/frauddetect/src/main/resources/META-INF/properties.xml @@ -0,0 +1,167 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> +<property> + <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name> + <value>1000</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.ccUserAlertQueryOutput.topic</name> + <value>examples.app.frauddetect.fraudAlert</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.binUserAlertOutput.topic</name> + <value>examples.app.frauddetect.fraudAlert</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.avgUserAlertQueryOutput.topic</name> + <value>examples.app.frauddetect.fraudAlert</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.txSummaryWsOutput.topic</name> + <value>examples.app.frauddetect.txSummary</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.attr.APPLICATION_WINDOW_COUNT + </name> + <value>10</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.threshold + </name> + <value>20</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.rangePerMerchant.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.txReceiver.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<!-- property> + <name>dt.application.frauddetect.operator.smaPerMerchant.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property--> +<property> + <name>dt.application.FraudDetectExample.operator.smaPerMerchant.windowSize + </name> + <value>30</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.movingSum.attr.APPLICATION_WINDOW_COUNT + </name> + <value>10</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.movingSum.windowSize + </name> + <value>3</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.avgAlerter.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.avgAlerter.threshold + </name> + <value>1200</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.amountFraudDetector.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.amountFraudDetector.threshold + </name> + <value>420</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.collection</name> + <value>txStats</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.collection</name> + <value>binAlerts</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.collection</name> + <value>ccAlerts</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.collection</name> + <value>avgAlerts</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.txStatsAggregator.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> + +<property> + <name>dt.application.FraudDetectExample.port.*.attr.QUEUE_CAPACITY</name> + <value>32000</value> + </property> + <property> + <name>dt.application.FraudDetectExample.operator.*.attr.MEMORY_MB</name> + <value>2048</value> + </property> + +</configuration> + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java b/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java new file mode 100644 index 0000000..ffb1cf2 --- /dev/null +++ b/examples/frauddetect/src/test/java/org/apache/apex/examples/frauddetect/FrauddetectApplicationTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.frauddetect; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +/** + * Fraud detection application test + */ +public class FrauddetectApplicationTest +{ + + public FrauddetectApplicationTest() + { + } + + @Test + public void testApplication() throws Exception + { + try { + Application application = new Application(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-frauddetect.xml"); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(application, conf); + lma.getController().run(120000); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml b/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml new file mode 100644 index 0000000..7a404c4 --- /dev/null +++ b/examples/frauddetect/src/test/resources/dt-site-frauddetect.xml @@ -0,0 +1,173 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.FraudDetectExample.class</name> + <value>com.datatorrent.examples.frauddetect.Application</value> + <description>An alias for the application</description> +</property> +<property> + <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name> + <value>1000</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.ccUserAlertQueryOutput.topic</name> + <value>examples.app.frauddetect.fraudAlert</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.binUserAlertOutput.topic</name> + <value>examples.app.frauddetect.fraudAlert</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.avgUserAlertQueryOutput.topic</name> + <value>examples.app.frauddetect.fraudAlert</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.txSummaryWsOutput.topic</name> + <value>examples.app.frauddetect.txSummary</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.attr.APPLICATION_WINDOW_COUNT + </name> + <value>10</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.bankInfoFraudDetector.threshold + </name> + <value>20</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.rangePerMerchant.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.txReceiver.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<!-- property> + <name>dt.application.frauddetect.operator.smaPerMerchant.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property--> +<property> + <name>dt.application.FraudDetectExample.operator.smaPerMerchant.windowSize + </name> + <value>30</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.movingSum.attr.APPLICATION_WINDOW_COUNT + </name> + <value>10</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.movingSum.windowSize + </name> + <value>3</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.avgAlerter.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.avgAlerter.threshold + </name> + <value>1200</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.amountFraudDetector.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.amountFraudDetector.threshold + </name> + <value>420</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoTxStatsOutput.collection</name> + <value>txStats</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoBinAlertsOutput.collection</name> + <value>binAlerts</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoCcAlertsOutput.collection</name> + <value>ccAlerts</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.hostName</name> + <value>localhost</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.dataBase</name> + <value>frauddetect</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.mongoAvgAlertsOutput.collection</name> + <value>avgAlerts</value> +</property> +<property> + <name>dt.application.FraudDetectExample.operator.txStatsAggregator.attr.APPLICATION_WINDOW_COUNT + </name> + <value>1</value> +</property> + +<property> + <name>dt.application.FraudDetectExample.port.*.attr.QUEUE_CAPACITY</name> + <value>32000</value> + </property> +<property> + <name>dt.application.FraudDetectExampleq.operator.*.attr.MEMORY_MB</name> + <value>2048</value> +</property> + + +</configuration> + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/frauddetect/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/frauddetect/src/test/resources/log4j.properties b/examples/frauddetect/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/frauddetect/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/pom.xml ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/pom.xml b/examples/highlevelapi/pom.xml new file mode 100644 index 0000000..da843b7 --- /dev/null +++ b/examples/highlevelapi/pom.xml @@ -0,0 +1,141 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-highlevelapi</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar High-Level API Example</name> + <description>Apex exmaple applications that use High-level API to construct a dag</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <build> + <plugins> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + <dependency> + <!-- required by twitter example --> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + <version>4.0.4</version> + </dependency> + <dependency> + <!-- required by twitter example --> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>4.0.4</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-stream</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.hsqldb</groupId> + <artifactId>hsqldb</artifactId> + <version>2.3.1</version> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>1.4.192</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.9.1</version> + </dependency> + <dependency> + <!--This dependency is needed for StreamingWordExtractTest--> + <groupId>org.codehaus.janino</groupId> + <artifactId>commons-compiler</artifactId> + <version>2.7.8</version> + <type>jar</type> + </dependency> + <dependency> + <!--This dependency is needed for StreamingWordExtractTest--> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>2.7.8</version> + <scope>test</scope> + </dependency> + </dependencies> + + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/assemble/appPackage.xml b/examples/highlevelapi/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/highlevelapi/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java new file mode 100644 index 0000000..327c882 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/MinimalWordCount.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam MinimalWordCount Example + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "MinimalWordCount") +public class MinimalWordCount implements StreamingApplication +{ + public static class Collector extends BaseOperator + { + static Map<String, Long> result; + private static boolean done = false; + + public static boolean isDone() + { + return done; + } + + @Override + public void setup(Context.OperatorContext context) + { + done = false; + result = new HashMap<>(); + } + + public final transient DefaultInputPort<KeyValPair<String, Long>> input = new DefaultInputPort<KeyValPair<String, Long>>() + { + @Override + public void process(KeyValPair<String, Long> tuple) + { + if (tuple.getKey().equals("bye")) { + done = true; + } + result.put(tuple.getKey(), tuple.getValue()); + } + }; + } + + /** + * Populate the dag using High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Collector collector = new Collector(); + // Create a stream reading from a file line by line using StreamFactory. + StreamFactory.fromFolder("./src/test/resources/wordcount", name("textInput")) + // Use a flatmap transformation to extract words from the incoming stream of lines. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[^a-zA-Z']+")); + + } + }, name("ExtractWords")) + // Apply windowing to the stream for counting, in this case, the window option is global window. + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + // Count the appearances of every word. + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple<KeyValPair<String, Long>>(new KeyValPair<String, Long>(input, 1L)); + } + }, name("countByKey")) + // Format the counting result to a readable format by unwrapping the tuples. + .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, KeyValPair<String, Long>>() + { + @Override + public KeyValPair<String, Long> f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + return input.getValue(); + } + }, name("FormatResults")) + // Print the result. + .print(name("console")) + // Attach a collector to the stream to collect results. + .endWith(collector, collector.input, name("Collector")) + // populate the dag using the stream. + .populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java new file mode 100644 index 0000000..5b83bd0 --- /dev/null +++ b/examples/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/WindowedWordCount.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.joda.time.Duration; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.WindowedStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Throwables; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + +import static org.apache.apex.malhar.stream.api.Option.Options.name; + +/** + * Beam WindowedWordCount Example. + * + * @since 3.5.0 + */ +@ApplicationAnnotation(name = "WindowedWordCount") +public class WindowedWordCount implements StreamingApplication +{ + static final int WINDOW_SIZE = 1; // Default window duration in minutes + + /** + * A input operator that reads from and output a file line by line to downstream with a time gap between + * every two lines. + */ + public static class TextInput extends BaseOperator implements InputOperator + { + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + private boolean done = false; + + private transient BufferedReader reader; + + @Override + public void setup(Context.OperatorContext context) + { + done = false; + initReader(); + } + + private void initReader() + { + try { + InputStream resourceStream = this.getClass().getResourceAsStream("/wordcount/word.txt"); + reader = new BufferedReader(new InputStreamReader(resourceStream)); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + if (!done) { + try { + String line = reader.readLine(); + if (line == null) { + done = true; + reader.close(); + } else { + this.output.emit(line); + } + Thread.sleep(50); + } catch (IOException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + } + } + + public static class Collector extends BaseOperator + { + private static Map<KeyValPair<Long, String>, Long> result = new HashMap<>(); + private static boolean done = false; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + done = false; + } + + public static boolean isDone() + { + return done; + } + + public static Map<KeyValPair<Long, String>, Long> getResult() + { + return result; + } + + public final transient DefaultInputPort<PojoEvent> input = new DefaultInputPort<PojoEvent>() + { + @Override + public void process(PojoEvent tuple) + { + result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getWord()), tuple.getCount()); + if (tuple.getWord().equals("bye")) { + done = true; + } + } + }; + } + + /** + * A Pojo Tuple class used for outputting result to JDBC. + */ + public static class PojoEvent + { + private String word; + private long count; + private long timestamp; + + @Override + public String toString() + { + return "PojoEvent (word=" + getWord() + ", count=" + getCount() + ", timestamp=" + getTimestamp() + ")"; + } + + public String getWord() + { + return word; + } + + public void setWord(String word) + { + this.word = word; + } + + public long getCount() + { + return count; + } + + public void setCount(long count) + { + this.count = count; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + } + + /** + * A map function that wrap the input string with a random generated timestamp. + */ + public static class AddTimestampFn implements Function.MapFunction<String, Tuple.TimestampedTuple<String>> + { + private static final Duration RAND_RANGE = Duration.standardMinutes(10); + private final Long minTimestamp; + + AddTimestampFn() + { + this.minTimestamp = System.currentTimeMillis(); + } + + @Override + public Tuple.TimestampedTuple<String> f(String input) + { + // Generate a timestamp that falls somewhere in the past two hours. + long randMillis = (long)(Math.random() * RAND_RANGE.getMillis()); + long randomTimestamp = minTimestamp + randMillis; + + return new Tuple.TimestampedTuple<>(randomTimestamp, input); + } + } + + /** A MapFunction that converts a Word and Count into a PojoEvent. */ + public static class FormatAsTableRowFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String, Long>>, PojoEvent> + { + @Override + public PojoEvent f(Tuple.WindowedTuple<KeyValPair<String, Long>> input) + { + PojoEvent row = new PojoEvent(); + row.setTimestamp(input.getTimestamp()); + row.setCount(input.getValue().getValue()); + row.setWord(input.getValue().getKey()); + return row; + } + } + + /** + * Populate dag with High-Level API. + * @param dag + * @param conf + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TextInput input = new TextInput(); + Collector collector = new Collector(); + + // Create stream from the TextInput operator. + ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(input, input.output, name("input")) + + // Extract all the words from the input line of text. + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); + } + }, name("ExtractWords")) + + // Wrap the word with a randomly generated timestamp. + .map(new AddTimestampFn(), name("AddTimestampFn")); + + + // apply window and trigger option. + // TODO: change trigger option to atWaterMark when available. + WindowedStream<Tuple.TimestampedTuple<String>> windowedWords = stream + .window(new WindowOption.TimeWindows(Duration.standardMinutes(WINDOW_SIZE)), + new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)); + + + WindowedStream<PojoEvent> wordCounts = + // Perform a countByKey transformation to count the appearance of each word in every time window. + windowedWords.countByKey(new Function.ToKeyValue<Tuple.TimestampedTuple<String>, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(Tuple.TimestampedTuple<String> input) + { + return new Tuple.TimestampedTuple<KeyValPair<String, Long>>(input.getTimestamp(), + new KeyValPair<String, Long>(input.getValue(), 1L)); + } + }, name("count words")) + + // Format the output and print out the result. + .map(new FormatAsTableRowFn(), name("FormatAsTableRowFn")).print(name("console")); + + wordCounts.endWith(collector, collector.input, name("Collector")).populateDag(dag); + } +}
