Repository: apex-malhar Updated Branches: refs/heads/master 0a1adff8a -> 7d9386d2a
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java index fd67bf6..7e5bb93 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java @@ -20,23 +20,22 @@ package com.datatorrent.demos.wordcount; import java.net.URI; -import org.apache.commons.lang.StringUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.api.StreamingApplication; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.appdata.schemas.SchemaUtils; import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; -import com.datatorrent.lib.io.ConsoleOutputOperator; - -import org.apache.hadoop.conf.Configuration; /** * Simple demo that computes word frequencies from any file dropped into a @@ -49,7 +48,7 @@ import org.apache.hadoop.conf.Configuration; * <p> * @since 3.2.0 */ -@ApplicationAnnotation(name="TopNWordsWithQueries") +@ApplicationAnnotation(name = "TopNWordsWithQueries") public class ApplicationWithQuerySupport implements StreamingApplication { private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class); @@ -87,13 +86,13 @@ public class ApplicationWithQuerySupport implements StreamingApplication String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if ( ! StringUtils.isEmpty(gatewayAddress)) { // add query support + if (!StringUtils.isEmpty(gatewayAddress)) { // add query support URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); AppDataSnapshotServerMap snapshotServerFile - = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap()); + = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap()); AppDataSnapshotServerMap snapshotServerGlobal - = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap()); + = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap()); String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON); @@ -108,19 +107,17 @@ public class ApplicationWithQuerySupport implements StreamingApplication snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal); PubSubWebSocketAppDataResult wsResultFile - = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult()); + = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult()); PubSubWebSocketAppDataResult wsResultGlobal - = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult()); + = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult()); wsResultFile.setUri(uri); wsResultGlobal.setUri(uri); Operator.InputPort<String> queryResultFilePort = wsResultFile.input; Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input; - dag.addStream("WordCountsFile", fileWordCount.outputPerFile, - snapshotServerFile.input, console.input); - dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, - snapshotServerGlobal.input); + dag.addStream("WordCountsFile", fileWordCount.outputPerFile, snapshotServerFile.input, console.input); + dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, snapshotServerGlobal.input); dag.addStream("ResultFile", snapshotServerFile.queryResult, queryResultFilePort); dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, queryResultGlobalPort); @@ -129,7 +126,7 @@ public class ApplicationWithQuerySupport implements StreamingApplication dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input); } - System.out.println("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled()); + LOG.info("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled()); LOG.info("Returning from populateDAG"); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java index 8eb004a..e8a91b2 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java @@ -28,11 +28,11 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.common.util.BaseOperator; /** @@ -169,14 +169,14 @@ public class FileWordCount extends BaseOperator * Output port for current file output */ public final transient DefaultOutputPort<List<Map<String, Object>>> - outputPerFile = new DefaultOutputPort<>(); + outputPerFile = new DefaultOutputPort<>(); /** * Output port for global output */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<List<Map<String, Object>>> - outputGlobal = new DefaultOutputPort<>(); + outputGlobal = new DefaultOutputPort<>(); /** * Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} is the final @@ -184,12 +184,13 @@ public class FileWordCount extends BaseOperator * {@code endWindow()} call after an EOF */ public final transient DefaultOutputPort<Map<String, Object>> - fileOutput = new DefaultOutputPort<>(); + fileOutput = new DefaultOutputPort<>(); /** * Get the number of top (word, frequency) pairs that will be output */ - public int getTopN() { + public int getTopN() + { return topN; } @@ -197,7 +198,8 @@ public class FileWordCount extends BaseOperator * Set the number of top (word, frequency) pairs that will be output * @param n The new number */ - public void setTopN(int n) { + public void setTopN(int n) + { topN = n; } @@ -250,8 +252,7 @@ public class FileWordCount extends BaseOperator return; } - LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", - fileName, wordMapFile.size(), topN); + LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", fileName, wordMapFile.size(), topN); // get topN list for this file and, if we have EOF, emit to fileOutput port @@ -293,13 +294,15 @@ public class FileWordCount extends BaseOperator final ArrayList<WCPair> list = new ArrayList<>(map.values()); // sort entries in descending order of frequency - Collections.sort(list, new Comparator<WCPair>() { - @Override - public int compare(WCPair o1, WCPair o2) { - return (int)(o2.freq - o1.freq); - } + Collections.sort(list, new Comparator<WCPair>() + { + @Override + public int compare(WCPair o1, WCPair o2) + { + return (int)(o2.freq - o1.freq); + } }); - + if (topN > 0) { list.subList(topN, map.size()).clear(); // retain only the first topN entries } @@ -329,13 +332,15 @@ public class FileWordCount extends BaseOperator fileFinalList.addAll(map.values()); // sort entries in descending order of frequency - Collections.sort(fileFinalList, new Comparator<WCPair>() { - @Override - public int compare(WCPair o1, WCPair o2) { - return (int)(o2.freq - o1.freq); - } + Collections.sort(fileFinalList, new Comparator<WCPair>() + { + @Override + public int compare(WCPair o1, WCPair o2) + { + return (int)(o2.freq - o1.freq); + } }); - + if (topN > 0) { fileFinalList.subList(topN, map.size()).clear(); // retain only the first topN entries } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java index fe10d73..8a1a57b 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java @@ -23,13 +23,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import org.apache.hadoop.fs.Path; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import org.apache.hadoop.fs.Path; + import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.lib.io.fs.AbstractFileInputOperator; /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java index 1817f2b..bb67622 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java @@ -23,8 +23,8 @@ package com.datatorrent.demos.wordcount; * * @since 3.2.0 */ -public class WCPair { - +public class WCPair +{ /** * The word */ @@ -38,20 +38,25 @@ public class WCPair { /** * Default constructor */ - public WCPair() {} + public WCPair() + { + + } /** * Create new object with given values * @param w The word * @param f The frequency */ - public WCPair(String w, int f) { + public WCPair(String w, int f) + { word = w; freq = f; } - + @Override - public String toString() { + public String toString() + { return String.format("(%s, %d)", word, freq); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java index 999f32f..0edfd1e 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java @@ -80,7 +80,9 @@ public class WindowWordCount extends BaseOperator LOG.info("WindowWordCount: endWindow"); // got EOF; if no words found, do nothing - if (wordMap.isEmpty()) return; + if (wordMap.isEmpty()) { + return; + } // have some words; emit single map and reset for next file final ArrayList<WCPair> list = new ArrayList<>(wordMap.values()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java index 3ab20c6..3a88bab 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java @@ -18,84 +18,92 @@ */ package com.datatorrent.demos.wordcount; -import com.datatorrent.lib.io.SimpleSinglePortInputOperator; - -import java.io.*; +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.lib.io.SimpleSinglePortInputOperator; + /** * <p>WordCountInputOperator class.</p> * * @since 0.3.2 */ -public class WordCountInputOperator extends SimpleSinglePortInputOperator<String> implements Runnable { - - private static final Logger logger = LoggerFactory.getLogger(WordCountInputOperator.class); - protected long averageSleep = 300; - protected long sleepPlusMinus = 100; - protected String fileName = "com/datatorrent/demos/wordcount/samplefile.txt"; +public class WordCountInputOperator extends SimpleSinglePortInputOperator<String> implements Runnable +{ - public void setAverageSleep(long as) { - averageSleep = as; - } + private static final Logger logger = LoggerFactory.getLogger(WordCountInputOperator.class); + protected long averageSleep = 300; + protected long sleepPlusMinus = 100; + protected String fileName = "com/datatorrent/demos/wordcount/samplefile.txt"; - public void setSleepPlusMinus(long spm) { - sleepPlusMinus = spm; - } + public void setAverageSleep(long as) + { + averageSleep = as; + } - public void setFileName(String fn) { - fileName = fn; - } + public void setSleepPlusMinus(long spm) + { + sleepPlusMinus = spm; + } - @Override - public void run() { - BufferedReader br = null; - DataInputStream in = null; - InputStream fstream = null; + public void setFileName(String fn) + { + fileName = fn; + } - while (true) { - try { - String line; - fstream = this.getClass().getClassLoader().getResourceAsStream(fileName); + @Override + public void run() + { + BufferedReader br = null; + DataInputStream in = null; + InputStream fstream = null; - in = new DataInputStream(fstream); - br = new BufferedReader(new InputStreamReader(in)); + while (true) { + try { + String line; + fstream = this.getClass().getClassLoader().getResourceAsStream(fileName); - while ((line = br.readLine()) != null) { - String[] words = line.trim().split("[\\p{Punct}\\s\\\"\\'ââ]+"); - for (String word : words) { - word = word.trim().toLowerCase(); - if (!word.isEmpty()) { - //System.out.println("Sending "+word); - outputPort.emit(word); - } - } - try { - Thread.sleep(averageSleep + (new Double(sleepPlusMinus * (Math.random() * 2 - 1))).longValue()); - } catch (InterruptedException ex) { - // nothing - } - } + in = new DataInputStream(fstream); + br = new BufferedReader(new InputStreamReader(in)); - } catch (IOException ex) { - logger.debug(ex.toString()); - } finally { - try { - if (br != null) { - br.close(); - } - if (in != null) { - in.close(); - } - if (fstream != null) { - fstream.close(); - } - } catch (IOException exc) { - // nothing - } + while ((line = br.readLine()) != null) { + String[] words = line.trim().split("[\\p{Punct}\\s\\\"\\'ââ]+"); + for (String word : words) { + word = word.trim().toLowerCase(); + if (!word.isEmpty()) { + outputPort.emit(word); } + } + try { + Thread.sleep(averageSleep + (new Double(sleepPlusMinus * (Math.random() * 2 - 1))).longValue()); + } catch (InterruptedException ex) { + // nothing + } + } + + } catch (IOException ex) { + logger.debug(ex.toString()); + } finally { + try { + if (br != null) { + br.close(); + } + if (in != null) { + in.close(); + } + if (fstream != null) { + fstream.close(); + } + } catch (IOException exc) { + // nothing } + } } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java index 62c41d3..30aab10 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java @@ -40,7 +40,7 @@ public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Obje private static final String nl = System.lineSeparator(); private String fileName; // current file name - private transient final StringBuilder sb = new StringBuilder(); + private final transient StringBuilder sb = new StringBuilder(); /** * {@inheritDoc} @@ -83,15 +83,17 @@ public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Obje // get first and only pair; key is the fileName and is ignored here final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next(); - final List<WCPair> list = (List<WCPair>) entry.getValue(); + final List<WCPair> list = (List<WCPair>)entry.getValue(); if (sb.length() > 0) { // clear buffer sb.delete(0, sb.length()); } for ( WCPair pair : list ) { - sb.append(pair.word); sb.append(" : "); - sb.append(pair.freq); sb.append(nl); + sb.append(pair.word); + sb.append(" : "); + sb.append(pair.freq); + sb.append(nl); } final String data = sb.toString(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java index 56d9294..58c44b4 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java @@ -46,8 +46,8 @@ public class WordReader extends BaseOperator /** * Input port on which lines from the current file are received */ - public final transient DefaultInputPort<String> - input = new DefaultInputPort<String>() { + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { @Override public void process(String line) @@ -55,7 +55,9 @@ public class WordReader extends BaseOperator // line; split it into words and emit them final String[] words = nonWord.split(line); for (String word : words) { - if (word.isEmpty()) continue; + if (word.isEmpty()) { + continue; + } output.emit(word); } } @@ -65,7 +67,8 @@ public class WordReader extends BaseOperator * Returns the regular expression that matches strings between words * @return Regular expression for strings that separate words */ - public String getNonWordStr() { + public String getNonWordStr() + { return nonWordStr; } @@ -73,7 +76,8 @@ public class WordReader extends BaseOperator * Sets the regular expression that matches strings between words * @param regex New regular expression for strings that separate words */ - public void setNonWordStr(String regex) { + public void setNonWordStr(String regex) + { nonWordStr = regex; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java b/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java index 6fc0dfa..1df0459 100644 --- a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java +++ b/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java @@ -18,16 +18,18 @@ */ package com.datatorrent.demos.wordcount; -import com.datatorrent.api.LocalMode; -import com.datatorrent.demos.wordcount.Application; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; /** * */ public class ApplicationTest { + private final transient Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); public ApplicationTest() { } @@ -36,14 +38,14 @@ public class ApplicationTest public void testSomeMethod() throws Exception { LocalMode lma = LocalMode.newInstance(); - Configuration conf =new Configuration(false); + Configuration conf = new Configuration(false); conf.addResource("dt-site-wordcount.xml"); lma.prepareDAG(new Application(), conf); LocalMode.Controller lc = lma.getController(); long start = System.currentTimeMillis(); lc.run(300000); long end = System.currentTimeMillis(); - long time = end -start; - System.out.println("Test used "+time+" ms"); + long time = end - start; + LOG.debug("Test used " + time + " ms"); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java index 1da5b6c..50b306d 100644 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java +++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java @@ -18,28 +18,29 @@ */ package com.datatorrent.demos.yahoofinance; -import com.datatorrent.api.StreamingApplication; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator; import com.datatorrent.lib.streamquery.DerbySqlStreamOperator; -import org.apache.hadoop.conf.Configuration; - /** * This demo will output the stock market data from yahoo finance * * @since 0.3.2 */ -@ApplicationAnnotation(name="YahooFinanceWithDerbySQLDemo") +@ApplicationAnnotation(name = "YahooFinanceWithDerbySQLDemo") public class ApplicationWithDerbySQL implements StreamingApplication { @Override - public void populateDAG(DAG dag, Configuration conf) { - String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + ".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM"); + public void populateDAG(DAG dag, Configuration conf) + { + String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + ".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM"); - String[] symbols = symbolStr.split(","); + String[] symbols = symbolStr.split(","); YahooFinanceCSVInputOperator input1 = dag.addOperator("input1", new YahooFinanceCSVInputOperator()); YahooFinanceCSVInputOperator input2 = dag.addOperator("input2", new YahooFinanceCSVInputOperator()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java index b658575..01e3ce9 100644 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java +++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java @@ -18,26 +18,33 @@ */ package com.datatorrent.demos.yahoofinance; -import au.com.bytecode.opencsv.CSVReader; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.lib.util.KeyValPair; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.cookie.CookiePolicy; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.params.DefaultHttpParams; import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.KeyValPair; + +import au.com.bytecode.opencsv.CSVReader; /** * This operator sends price, volume and time into separate ports and calculates incremental volume. @@ -86,14 +93,14 @@ public class StockTickInput implements InputOperator private String prepareURL() { String str = "http://download.finance.yahoo.com/d/quotes.csv?s="; - for (int i = 0; i < symbols.length; i++) { - if (i != 0) { - str += ","; - } - str += symbols[i]; + for (int i = 0; i < symbols.length; i++) { + if (i != 0) { + str += ","; } - str += "&f=sl1vt1&e=.csv"; - return str; + str += symbols[i]; + } + str += "&f=sl1vt1&e=.csv"; + return str; } @Override @@ -118,8 +125,7 @@ public class StockTickInput implements InputOperator int statusCode = client.executeMethod(method); if (statusCode != HttpStatus.SC_OK) { logger.error("Method failed: " + method.getStatusLine()); - } - else { + } else { InputStream istream = method.getResponseBodyAsStream(); // Process response InputStreamReader isr = new InputStreamReader(istream); @@ -150,11 +156,9 @@ public class StockTickInput implements InputOperator } } Thread.sleep(readIntervalMillis); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { logger.debug(ex.toString()); - } - catch (IOException ex) { + } catch (IOException ex) { logger.debug(ex.toString()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java index debf91d..a6aaece 100644 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java +++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java @@ -18,7 +18,7 @@ */ package com.datatorrent.demos.yahoofinance; - +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; @@ -32,7 +32,6 @@ import com.datatorrent.lib.multiwindow.SimpleMovingAverage; import com.datatorrent.lib.stream.ConsolidatorKeyVal; import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec; import com.datatorrent.lib.util.HighLow; -import org.apache.hadoop.conf.Configuration; /** * Yahoo! Finance Application Demo :<br> @@ -191,7 +190,7 @@ import org.apache.hadoop.conf.Configuration; * * @since 0.3.2 */ -@ApplicationAnnotation(name="YahooFinanceDemo") +@ApplicationAnnotation(name = "YahooFinanceDemo") public class YahooFinanceApplication implements StreamingApplication { protected int streamingWindowSizeMilliSeconds = 1000; // 1 second @@ -329,8 +328,8 @@ public class YahooFinanceApplication implements StreamingApplication /** * Populate Yahoo Finance Demo Application DAG. */ - @SuppressWarnings("unchecked") - @Override + @SuppressWarnings("unchecked") + @Override public void populateDAG(DAG dag, Configuration conf) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java index 0cdbfbb..cf3801e 100644 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java +++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java @@ -18,23 +18,26 @@ */ package com.datatorrent.demos.yahoofinance; -import au.com.bytecode.opencsv.CSVReader; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.io.SimpleSinglePortInputOperator; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpStatus; import org.apache.commons.httpclient.cookie.CookiePolicy; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.params.DefaultHttpParams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.io.SimpleSinglePortInputOperator; + +import au.com.bytecode.opencsv.CSVReader; /** * Grabs Yahoo Finance quotes data and emits HashMap, with key equals the format name (e.g. "s0") <p> @@ -103,7 +106,7 @@ public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator< if (i != 0) { str += ","; } - str += symbolList.get(i); + str += symbolList.get(i); } str += "&f="; for (String format: parameterList) { @@ -129,9 +132,8 @@ public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator< try { int statusCode = client.executeMethod(method); if (statusCode != HttpStatus.SC_OK) { - System.err.println("Method failed: " + method.getStatusLine()); - } - else { + logger.error("Method failed: " + method.getStatusLine()); + } else { InputStream istream; istream = method.getResponseBodyAsStream(); // Process response @@ -148,11 +150,9 @@ public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator< } } Thread.sleep(readIntervalMillis); - } - catch (InterruptedException ex) { + } catch (InterruptedException ex) { logger.debug(ex.toString()); - } - catch (IOException ex) { + } catch (IOException ex) { logger.debug(ex.toString()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java index 587f9de..c038e61 100644 --- a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java +++ b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java @@ -18,10 +18,9 @@ */ package com.datatorrent.demos.yahoofinance; -import com.datatorrent.api.LocalMode; -import com.datatorrent.demos.yahoofinance.YahooFinanceApplication; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; /** * Run Yahoo Finance application demo. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java index b430f92..7b134f5 100644 --- a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java +++ b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java @@ -18,16 +18,18 @@ */ package com.datatorrent.demos.yahoofinance; -import com.datatorrent.api.LocalMode; -import com.datatorrent.demos.yahoofinance.ApplicationWithDerbySQL; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; /** * */ public class ApplicationWithDerbySQLTest { + private final transient Logger LOG = LoggerFactory.getLogger(ApplicationWithDerbySQLTest.class); public ApplicationWithDerbySQLTest() { } @@ -42,7 +44,7 @@ public class ApplicationWithDerbySQLTest long start = System.currentTimeMillis(); lc.run(); long end = System.currentTimeMillis(); - long time = end -start; - System.out.println("Test used "+time+" ms"); + long time = end - start; + LOG.debug("Test used " + time + " ms"); } }
