http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java deleted file mode 100644 index e8a91b2..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.InputPortFieldAnnotation; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.common.util.BaseOperator; - -/** - * Computes word frequencies per file and globally, and writes the top N pairs to an output file - * and to snapshot servers for visualization. - * Currently designed to work with only 1 file at a time; will be enhanced later to support - * multiple files dropped into the monitored directory at the same time. - * - * <p> - * Receives per-window list of pairs (word, frequency) on the input port. When the end of a file - * is reached, expects to get an EOF on the control port; at the next endWindow, the top N words - * and frequencies are computed and emitted to the output ports. - * <p> - * There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully - * read and is written to the output file. (b) One for the top N counts emitted per window for the - * current file to the snapshot server and (c) One for the global top N counts emitted per window - * to a different snapshot server. - * - * Since the EOF is received by a single operator, this operator cannot be partitionable - * - * @since 3.2.0 - */ -public class FileWordCount extends BaseOperator -{ - private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class); - private static final String GLOBAL = "global"; - - /** - * If {@literal topN > 0}, only data for the topN most frequent words is output; if topN == 0, the - * entire frequency map is output - */ - protected int topN; - - /** - * Set to true when an EOF control tuple for the current input file is received; reset to false - * when the corresponding output file has been written. - */ - protected boolean eof = false; - - /** - * Last component of path (just the file name) - * incoming value from control tuple - */ - protected String fileName; - - /** - * {@literal (word => frequency)} map: current file, all words - */ - protected Map<String, WCPair> wordMapFile = new HashMap<>(); - - /** - * {@literal (word => frequency)} map: global, all words - */ - protected Map<String, WCPair> wordMapGlobal = new HashMap<>(); - - /** - * Singleton list with per file data; sent on {@code outputPerFile} - */ - protected transient List<Map<String, Object>> resultPerFile; - - /** - * Singleton list with global data; sent on {@code outputGlobal} - */ - protected transient List<Map<String, Object>> resultGlobal; - - /** - * Singleton map of {@code fileName} to sorted list of (word, frequency) pairs - */ - protected transient Map<String, Object> resultFileFinal; - - /** - * final list of (word, frequency) pairs written to output file - */ - protected transient List<WCPair> fileFinalList; - - /** - * Input port on which per-window {@literal (word => frequency)} map is received; the map - * is merged into {@code wordMapFile} and {@code wordMapGlobal}. - */ - public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>() - { - @Override - public void process(List<WCPair> list) - { - // blend incoming list into wordMapFile and wordMapGlobal - for (WCPair pair : list) { - final String word = pair.word; - WCPair filePair = wordMapFile.get(word); - if (null != filePair) { // word seen previously in current file - WCPair globalPair = wordMapGlobal.get(word); // cannot be null - filePair.freq += pair.freq; - globalPair.freq += pair.freq; - continue; - } - - // new word in current file - filePair = new WCPair(word, pair.freq); - wordMapFile.put(word, filePair); - - // check global map - WCPair globalPair = wordMapGlobal.get(word); // may be null - if (null != globalPair) { // word seen previously - globalPair.freq += pair.freq; - continue; - } - - // word never seen before - globalPair = new WCPair(word, pair.freq); - wordMapGlobal.put(word, globalPair); - } - } - }; - - /** - * Control port on which the current file name is received to indicate EOF - */ - @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<String> control = new DefaultInputPort<String>() - { - @Override - public void process(String msg) - { - if (msg.isEmpty()) { // sanity check - throw new RuntimeException("Empty file path"); - } - LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN); - fileName = msg; - eof = true; - // NOTE: current version only supports processing one file at a time. - } - }; - - /** - * Output port for current file output - */ - public final transient DefaultOutputPort<List<Map<String, Object>>> - outputPerFile = new DefaultOutputPort<>(); - - /** - * Output port for global output - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<List<Map<String, Object>>> - outputGlobal = new DefaultOutputPort<>(); - - /** - * Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} is the final - * top N pairs for current file and will be written to the output file; emitted in the - * {@code endWindow()} call after an EOF - */ - public final transient DefaultOutputPort<Map<String, Object>> - fileOutput = new DefaultOutputPort<>(); - - /** - * Get the number of top (word, frequency) pairs that will be output - */ - public int getTopN() - { - return topN; - } - - /** - * Set the number of top (word, frequency) pairs that will be output - * @param n The new number - */ - public void setTopN(int n) - { - topN = n; - } - - /** - * {@inheritDoc} - * Initialize various map and list fields - */ - @Override - public void setup(OperatorContext context) - { - if (null == wordMapFile) { - wordMapFile = new HashMap<>(); - } - if (null == wordMapGlobal) { - wordMapGlobal = new HashMap<>(); - } - resultPerFile = new ArrayList(1); - resultGlobal = new ArrayList(1); - // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName - resultFileFinal = new HashMap<>(1); - fileFinalList = new ArrayList<>(); - } - - /** - * {@inheritDoc} - * This is where we do most of the work: - * 1. Sort global map and emit top N pairs - * 2. Sort current file map and emit top N pairs - * 3. If we've seen EOF, emit top N pairs on port connected to file writer and clear all per-file - * data structures. - */ - @Override - public void endWindow() - { - LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN); - - if (wordMapFile.isEmpty()) { // no words found - if (eof) { // write empty list to fileOutput port - // got EOF, so output empty list to output file - fileFinalList.clear(); - resultFileFinal.put(fileName, fileFinalList); - fileOutput.emit(resultFileFinal); - - // reset for next file - eof = false; - fileName = null; - resultFileFinal.clear(); - } - LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN); - return; - } - - LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", fileName, wordMapFile.size(), topN); - - // get topN list for this file and, if we have EOF, emit to fileOutput port - - // get topN global list and emit to global output port - getTopNMap(wordMapGlobal, resultGlobal); - LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size()); - outputGlobal.emit(resultGlobal); - - // get topN list for this file and emit to file output port - getTopNMap(wordMapFile, resultPerFile); - LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size()); - outputPerFile.emit(resultPerFile); - - if (eof) { // got EOF earlier - if (null == fileName) { // need file name to emit topN pairs to file writer - throw new RuntimeException("EOF but no fileName at endWindow"); - } - - // so compute final topN list from wordMapFile into fileFinalList and emit it - getTopNList(wordMapFile); - resultFileFinal.put(fileName, fileFinalList); - fileOutput.emit(resultFileFinal); - - // reset for next file - eof = false; - fileName = null; - wordMapFile.clear(); - resultFileFinal.clear(); - } - } - - /** - * Get topN frequencies from map, convert each pair to a singleton map and append to result - * This map is suitable input to AppDataSnapshotServer - * MUST have {@code map.size() > 0} here - */ - private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result) - { - final ArrayList<WCPair> list = new ArrayList<>(map.values()); - - // sort entries in descending order of frequency - Collections.sort(list, new Comparator<WCPair>() - { - @Override - public int compare(WCPair o1, WCPair o2) - { - return (int)(o2.freq - o1.freq); - } - }); - - if (topN > 0) { - list.subList(topN, map.size()).clear(); // retain only the first topN entries - } - - // convert each pair (word, freq) of list to a map with 2 elements - // {("word": <word>, "count": freq)} and append to list - // - result.clear(); - for (WCPair pair : list) { - Map<String, Object> wmap = new HashMap<>(2); - wmap.put("word", pair.word); - wmap.put("count", pair.freq); - result.add(wmap); - } - LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size()); - list.clear(); - } - - /** - * Populate fileFinalList with topN frequencies from argument - * This list is suitable input to WordCountWriter which writes it to a file - * MUST have {@code map.size() > 0} here - */ - private void getTopNList(final Map<String, WCPair> map) - { - fileFinalList.clear(); - fileFinalList.addAll(map.values()); - - // sort entries in descending order of frequency - Collections.sort(fileFinalList, new Comparator<WCPair>() - { - @Override - public int compare(WCPair o1, WCPair o2) - { - return (int)(o2.freq - o1.freq); - } - }); - - if (topN > 0) { - fileFinalList.subList(topN, map.size()).clear(); // retain only the first topN entries - } - LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size()); - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java deleted file mode 100644 index 8a1a57b..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.fs.Path; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.lib.io.fs.AbstractFileInputOperator; - -/** - * Reads lines from input file and returns them. If EOF is reached, a control tuple - * is emitted on the control port - * - * @since 3.2.0 - */ -public class LineReader extends AbstractFileInputOperator<String> -{ - private static final Logger LOG = LoggerFactory.getLogger(LineReader.class); - - /** - * Output port on which lines from current file name are emitted - */ - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - - /** - * Control port on which the current file name is emitted to indicate EOF - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>(); - - private transient BufferedReader br = null; - - private Path path; - - /** - * File open callback; wrap the file input stream in a buffered reader for reading lines - * @param curPath The path to the file just opened - */ - @Override - protected InputStream openFile(Path curPath) throws IOException - { - LOG.info("openFile: curPath = {}", curPath); - path = curPath; - InputStream is = super.openFile(path); - br = new BufferedReader(new InputStreamReader(is)); - return is; - } - - /** - * File close callback; close buffered reader - * @param is File input stream that will imminently be closed - */ - @Override - protected void closeFile(InputStream is) throws IOException - { - super.closeFile(is); - br.close(); - br = null; - path = null; - } - - /** - * {@inheritDoc} - * If we hit EOF, emit file name on control port - */ - @Override - protected String readEntity() throws IOException - { - // try to read a line - final String line = br.readLine(); - if (null != line) { // common case - LOG.debug("readEntity: line = {}", line); - return line; - } - - // end-of-file; send control tuple, containing only the last component of the path - // (only file name) on control port - // - if (control.isConnected()) { - LOG.info("readEntity: EOF for {}", path); - final String name = path.getName(); // final component of path - control.emit(name); - } - - return null; - } - - @Override - protected void emit(String tuple) - { - output.emit(tuple); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java deleted file mode 100644 index bb67622..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -/** - * A single (word, frequency) pair - * - * @since 3.2.0 - */ -public class WCPair -{ - /** - * The word - */ - public String word; - - /** - * The frequency - */ - public int freq; - - /** - * Default constructor - */ - public WCPair() - { - - } - - /** - * Create new object with given values - * @param w The word - * @param f The frequency - */ - public WCPair(String w, int f) - { - word = w; - freq = f; - } - - @Override - public String toString() - { - return String.format("(%s, %d)", word, freq); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java deleted file mode 100644 index 0edfd1e..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * Computes word frequencies per window and emits them at each {@code endWindow()}. The output is a - * list of (word, frequency) pairs - * - * @since 3.2.0 - */ -public class WindowWordCount extends BaseOperator -{ - private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class); - - /** {@literal (word => frequency)} map for current window */ - protected Map<String, WCPair> wordMap = new HashMap<>(); - - /** - * Input port on which words are received - */ - public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() - { - @Override - public void process(String word) - { - WCPair pair = wordMap.get(word); - if (null != pair) { // word seen previously - pair.freq += 1; - return; - } - - // new word - pair = new WCPair(); - pair.word = word; - pair.freq = 1; - wordMap.put(word, pair); - } - }; - - /** - * Output port which emits the list of word frequencies for current window - */ - public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>(); - - /** - * {@inheritDoc} - * If we've seen some words in this window, emit the map and clear it for next window - */ - @Override - public void endWindow() - { - LOG.info("WindowWordCount: endWindow"); - - // got EOF; if no words found, do nothing - if (wordMap.isEmpty()) { - return; - } - - // have some words; emit single map and reset for next file - final ArrayList<WCPair> list = new ArrayList<>(wordMap.values()); - output.emit(list); - list.clear(); - wordMap.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java deleted file mode 100644 index 3a88bab..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.io.SimpleSinglePortInputOperator; - -/** - * <p>WordCountInputOperator class.</p> - * - * @since 0.3.2 - */ -public class WordCountInputOperator extends SimpleSinglePortInputOperator<String> implements Runnable -{ - - private static final Logger logger = LoggerFactory.getLogger(WordCountInputOperator.class); - protected long averageSleep = 300; - protected long sleepPlusMinus = 100; - protected String fileName = "com/datatorrent/demos/wordcount/samplefile.txt"; - - public void setAverageSleep(long as) - { - averageSleep = as; - } - - public void setSleepPlusMinus(long spm) - { - sleepPlusMinus = spm; - } - - public void setFileName(String fn) - { - fileName = fn; - } - - @Override - public void run() - { - BufferedReader br = null; - DataInputStream in = null; - InputStream fstream = null; - - while (true) { - try { - String line; - fstream = this.getClass().getClassLoader().getResourceAsStream(fileName); - - in = new DataInputStream(fstream); - br = new BufferedReader(new InputStreamReader(in)); - - while ((line = br.readLine()) != null) { - String[] words = line.trim().split("[\\p{Punct}\\s\\\"\\'ââ]+"); - for (String word : words) { - word = word.trim().toLowerCase(); - if (!word.isEmpty()) { - outputPort.emit(word); - } - } - try { - Thread.sleep(averageSleep + (new Double(sleepPlusMinus * (Math.random() * 2 - 1))).longValue()); - } catch (InterruptedException ex) { - // nothing - } - } - - } catch (IOException ex) { - logger.debug(ex.toString()); - } finally { - try { - if (br != null) { - br.close(); - } - if (in != null) { - in.close(); - } - if (fstream != null) { - fstream.close(); - } - } catch (IOException exc) { - // nothing - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java deleted file mode 100644 index 30aab10..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import java.io.UnsupportedEncodingException; - -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; - -/** - * Write top N words and their frequencies to a file - * - * @since 3.2.0 - */ -public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>> -{ - private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class); - private static final String charsetName = "UTF-8"; - private static final String nl = System.lineSeparator(); - - private String fileName; // current file name - private final transient StringBuilder sb = new StringBuilder(); - - /** - * {@inheritDoc} - * Invoke requestFinalize() to create the output file with the desired name without decorations. - */ - @Override - public void endWindow() - { - if (null != fileName) { - requestFinalize(fileName); - } - super.endWindow(); - } - - /** - * Extracts file name from argument - * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs} - * @return the file name to write the tuple to - */ - @Override - protected String getFileName(Map<String, Object> tuple) - { - LOG.info("getFileName: tuple.size = {}", tuple.size()); - - final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next(); - fileName = entry.getKey(); - LOG.info("getFileName: fileName = {}", fileName); - return fileName; - } - - /** - * Extracts output file content from argument - * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs} - * @return input tuple converted to an array of bytes - */ - @Override - protected byte[] getBytesForTuple(Map<String, Object> tuple) - { - LOG.info("getBytesForTuple: tuple.size = {}", tuple.size()); - - // get first and only pair; key is the fileName and is ignored here - final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next(); - final List<WCPair> list = (List<WCPair>)entry.getValue(); - - if (sb.length() > 0) { // clear buffer - sb.delete(0, sb.length()); - } - - for ( WCPair pair : list ) { - sb.append(pair.word); - sb.append(" : "); - sb.append(pair.freq); - sb.append(nl); - } - - final String data = sb.toString(); - LOG.info("getBytesForTuple: data = {}", data); - try { - final byte[] result = data.getBytes(charsetName); - return result; - } catch (UnsupportedEncodingException ex) { - throw new RuntimeException("Should never get here", ex); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java deleted file mode 100644 index 58c44b4..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import java.util.regex.Pattern; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * Extracts words from input line - * - * @since 3.3.0 - */ -public class WordReader extends BaseOperator -{ - // default pattern for word-separators - private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+"); - - private String nonWordStr; // configurable regex - private transient Pattern nonWord; // compiled regex - - /** - * Output port on which words from the current file are emitted - */ - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); - - /** - * Input port on which lines from the current file are received - */ - public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() - { - - @Override - public void process(String line) - { - // line; split it into words and emit them - final String[] words = nonWord.split(line); - for (String word : words) { - if (word.isEmpty()) { - continue; - } - output.emit(word); - } - } - }; - - /** - * Returns the regular expression that matches strings between words - * @return Regular expression for strings that separate words - */ - public String getNonWordStr() - { - return nonWordStr; - } - - /** - * Sets the regular expression that matches strings between words - * @param regex New regular expression for strings that separate words - */ - public void setNonWordStr(String regex) - { - nonWordStr = regex; - } - - /** - * {@inheritDoc} - * Set nonWord to the default pattern if necessary - */ - @Override - public void setup(OperatorContext context) - { - if (null == nonWordStr) { - nonWord = nonWordDefault; - } else { - nonWord = Pattern.compile(nonWordStr); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg deleted file mode 100644 index 054baed..0000000 Binary files a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/doc-files/UniqueWordCounter.jpg and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java deleted file mode 100644 index d00397d..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Streaming word count demonstration application. - */ -package com.datatorrent.demos.wordcount; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/resources/META-INF/properties.xml b/demos/wordcount/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 1d3594e..0000000 --- a/demos/wordcount/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,98 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- TopNWordsWithQueries --> - - <!-- for debugging --> - <!-- - <property> - <name>dt.attr.CONTAINER_JVM_OPTIONS</name> - <value>-Dlog4j.configuration=my_log4j.properties</value> - </property> - --> - - <!-- monitored input directory --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.lineReader.directory</name> - <value>/tmp/test/input-dir</value> - </property> - - <!-- regular expression for word separator --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr</name> - <value>[\p{Punct}\s]+</value> - </property> - - <!-- output directory for word counts --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.wcWriter.filePath</name> - <value>/tmp/test/output-dir</value> - </property> - - <!-- Top N value --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.topN</name> - <value>10</value> - </property> - - <!-- topic for queries (current file) --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.snapshotServerFile.embeddableQueryInfoProvider.topic</name> - <value>TopNWordsQueryFile</value> - </property> - - <!-- topic for query results (current file) --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.wsResultFile.topic</name> - <value>TopNWordsQueryFileResult</value> - </property> - - <!-- topic for queries (global) --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.snapshotServerGlobal.embeddableQueryInfoProvider.topic</name> - <value>TopNWordsQueryGlobal</value> - </property> - - <!-- topic for query results (global) --> - <property> - <name>dt.application.TopNWordsWithQueries.operator.wsResultGlobal.topic</name> - <value>TopNWordsQueryGlobalResult</value> - </property> - - <!-- retry count --> - <property> - <name>dt.application.TwitterDemo.operator.wsResult.numRetries</name> - <value>2147483647</value> - </property> - - - <!-- WordCountDemo --> - <property> - <name>dt.application.WordCountDemo.operator.wordinput.fileName</name> - <value>samplefile.txt</value> - </property> - <property> - <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name> - <value>CONTAINER_LOCAL</value> - <description>Specify container locality for the viewtuplecount stream - </description> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/WordDataSchema.json ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/resources/WordDataSchema.json b/demos/wordcount/src/main/resources/WordDataSchema.json deleted file mode 100644 index 5e8e7c0..0000000 --- a/demos/wordcount/src/main/resources/WordDataSchema.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "values": [{"name": "word", "type": "string"}, - {"name": "count", "type": "integer"}] -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt b/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt deleted file mode 100644 index 83eaaed..0000000 --- a/demos/wordcount/src/main/resources/com/datatorrent/demos/wordcount/samplefile.txt +++ /dev/null @@ -1 +0,0 @@ -CONTENT DELETED http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/resources/samplefile.txt ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/resources/samplefile.txt b/demos/wordcount/src/main/resources/samplefile.txt deleted file mode 100644 index 02a5e70..0000000 --- a/demos/wordcount/src/main/resources/samplefile.txt +++ /dev/null @@ -1,2 +0,0 @@ -CONTENT DELETED - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/site/conf/my-app-conf1.xml b/demos/wordcount/src/site/conf/my-app-conf1.xml deleted file mode 100644 index f35873b..0000000 --- a/demos/wordcount/src/site/conf/my-app-conf1.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java b/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java deleted file mode 100644 index 1df0459..0000000 --- a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; - -/** - * - */ -public class ApplicationTest -{ - private final transient Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); - public ApplicationTest() - { - } - - @Test - public void testSomeMethod() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource("dt-site-wordcount.xml"); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - long start = System.currentTimeMillis(); - lc.run(300000); - long end = System.currentTimeMillis(); - long time = end - start; - LOG.debug("Test used " + time + " ms"); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/resources/dt-site-wordcount.xml ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/test/resources/dt-site-wordcount.xml b/demos/wordcount/src/test/resources/dt-site-wordcount.xml deleted file mode 100644 index a25dac4..0000000 --- a/demos/wordcount/src/test/resources/dt-site-wordcount.xml +++ /dev/null @@ -1,37 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.application.WordCountDemo.class</name> - <value>com.datatorrent.demos.wordcount.Application</value> - <description>An alias for the application</description> - </property> - <property> - <name>dt.application.WordCountDemo.operator.wordinput.fileName</name> - <value>samplefile.txt</value> - </property> - <property> - <name>dt.application.WordCountDemo.stream.wordinput.count.locality</name> - <value>CONTAINER_LOCAL</value> - <description>Specify container locality for the viewtuplecount stream - </description> - </property> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/test/resources/log4j.properties b/demos/wordcount/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/wordcount/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/pom.xml ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/pom.xml b/demos/yahoofinance/pom.xml deleted file mode 100644 index 819a475..0000000 --- a/demos/yahoofinance/pom.xml +++ /dev/null @@ -1,65 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>yahoo-finance-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar Yahoo! Finance Demo</name> - <description>Apex demo applications that get Yahoo finance feed and calculate minute price range, minute volume and simple moving average.</description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <dependency> - <groupId>net.sf.opencsv</groupId> - <artifactId>opencsv</artifactId> - <version>2.0</version> - </dependency> - <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>10.9.1.0</version> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-contrib</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/assemble/appPackage.xml b/demos/yahoofinance/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/yahoofinance/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java deleted file mode 100644 index 1a38495..0000000 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.yahoofinance; - -import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator; -import org.apache.apex.malhar.contrib.misc.streamquery.DerbySqlStreamOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.io.ConsoleOutputOperator; - -/** - * This demo will output the stock market data from yahoo finance - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "YahooFinanceWithDerbySQLDemo") -public class ApplicationWithDerbySQL implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + ".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM"); - - String[] symbols = symbolStr.split(","); - - YahooFinanceCSVInputOperator input1 = dag.addOperator("input1", new YahooFinanceCSVInputOperator()); - YahooFinanceCSVInputOperator input2 = dag.addOperator("input2", new YahooFinanceCSVInputOperator()); - DerbySqlStreamOperator sqlOper = dag.addOperator("sqlOper", new DerbySqlStreamOperator()); - ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator()); - - for (String symbol : symbols) { - input1.addSymbol(symbol); - input2.addSymbol(symbol); - } - input1.addFormat("s0"); - input1.addFormat("l1"); - input2.addFormat("s0"); - input2.addFormat("e0"); - input2.addFormat("b4"); - - AbstractSqlStreamOperator.InputSchema inputSchema1 = new AbstractSqlStreamOperator.InputSchema("t1"); - AbstractSqlStreamOperator.InputSchema inputSchema2 = new AbstractSqlStreamOperator.InputSchema("t2"); - inputSchema1.setColumnInfo("s0", "varchar(100)", true); // symbol - inputSchema1.setColumnInfo("l1", "float", false); // last trade - inputSchema2.setColumnInfo("s0", "varchar(100)", true); // symbol - inputSchema2.setColumnInfo("e0", "float", false); // EPS - inputSchema2.setColumnInfo("b4", "float", false); // Book value - - sqlOper.setInputSchema(0, inputSchema1); - sqlOper.setInputSchema(1, inputSchema2); - - // Calculate PE Ratio and PB Ratio using SQL - sqlOper.addExecStatementString("SELECT SESSION.t1.s0 AS symbol, SESSION.t1.l1 / SESSION.t2.e0 AS pe_ratio, SESSION.t1.l1 / SESSION.t2.b4 AS pb_ratio FROM SESSION.t1,SESSION.t2 WHERE SESSION.t1.s0 = SESSION.t2.s0"); - - dag.addStream("input1_sql", input1.outputPort, sqlOper.in1); - dag.addStream("input2_sql", input2.outputPort, sqlOper.in2); - - dag.addStream("result_console", sqlOper.result, consoleOperator.input); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java deleted file mode 100644 index 01e3ce9..0000000 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.yahoofinance; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -import javax.validation.constraints.NotNull; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpStatus; -import org.apache.commons.httpclient.cookie.CookiePolicy; -import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.httpclient.params.DefaultHttpParams; -import org.apache.hadoop.util.StringUtils; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.lib.util.KeyValPair; - -import au.com.bytecode.opencsv.CSVReader; - -/** - * This operator sends price, volume and time into separate ports and calculates incremental volume. - * - * @since 0.3.2 - */ -public class StockTickInput implements InputOperator -{ - private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class); - /** - * Timeout interval for reading from server. 0 or negative indicates no timeout. - */ - public int readIntervalMillis = 500; - /** - * The URL of the web service resource for the POST request. - */ - private String url; - private String[] symbols; - @NotNull - private String tickers; - private transient HttpClient client; - private transient GetMethod method; - private HashMap<String, Long> lastVolume = new HashMap<String, Long>(); - private boolean outputEvenIfZeroVolume = false; - /** - * The output port to emit price. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>(); - /** - * The output port to emit incremental volume. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>(); - /** - * The output port to emit last traded time. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>(); - - /** - * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1 - * - * @return the URL - */ - private String prepareURL() - { - String str = "http://download.finance.yahoo.com/d/quotes.csv?s="; - for (int i = 0; i < symbols.length; i++) { - if (i != 0) { - str += ","; - } - str += symbols[i]; - } - str += "&f=sl1vt1&e=.csv"; - return str; - } - - @Override - public void setup(OperatorContext context) - { - url = prepareURL(); - client = new HttpClient(); - method = new GetMethod(url); - DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); - } - - @Override - public void teardown() - { - } - - @Override - public void emitTuples() - { - - try { - int statusCode = client.executeMethod(method); - if (statusCode != HttpStatus.SC_OK) { - logger.error("Method failed: " + method.getStatusLine()); - } else { - InputStream istream = method.getResponseBodyAsStream(); - // Process response - InputStreamReader isr = new InputStreamReader(istream); - CSVReader reader = new CSVReader(isr); - List<String[]> myEntries = reader.readAll(); - for (String[] stringArr: myEntries) { - ArrayList<String> tuple = new ArrayList<String>(Arrays.asList(stringArr)); - if (tuple.size() != 4) { - return; - } - // input csv is <Symbol>,<Price>,<Volume>,<Time> - String symbol = tuple.get(0); - double currentPrice = Double.valueOf(tuple.get(1)); - long currentVolume = Long.valueOf(tuple.get(2)); - String timeStamp = tuple.get(3); - long vol = currentVolume; - // Sends total volume in first tick, and incremental volume afterwards. - if (lastVolume.containsKey(symbol)) { - vol -= lastVolume.get(symbol); - } - - if (vol > 0 || outputEvenIfZeroVolume) { - price.emit(new KeyValPair<String, Double>(symbol, currentPrice)); - volume.emit(new KeyValPair<String, Long>(symbol, vol)); - time.emit(new KeyValPair<String, String>(symbol, timeStamp)); - lastVolume.put(symbol, currentVolume); - } - } - } - Thread.sleep(readIntervalMillis); - } catch (InterruptedException ex) { - logger.debug(ex.toString()); - } catch (IOException ex) { - logger.debug(ex.toString()); - } - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume) - { - this.outputEvenIfZeroVolume = outputEvenIfZeroVolume; - } - - public void setTickers(String tickers) - { - this.tickers = tickers; - symbols = StringUtils.split(tickers, ','); - } - - public String getTickers() - { - return tickers; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java deleted file mode 100644 index a6aaece..0000000 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java +++ /dev/null @@ -1,365 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.yahoofinance; - -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.math.RangeKeyVal; -import com.datatorrent.lib.math.SumKeyVal; -import com.datatorrent.lib.multiwindow.SimpleMovingAverage; -import com.datatorrent.lib.stream.ConsolidatorKeyVal; -import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec; -import com.datatorrent.lib.util.HighLow; - -/** - * Yahoo! Finance Application Demo :<br> - * Get Yahoo finance feed and calculate minute price range, minute volume, - * simple moving average of 5 minutes. <br> - * <br> - * Functional Description : <br> - * Application samples yahoo finance ticker every 200ms. All data points in one - * second are streamed from input adapter. <br> - * <br> - * - * Application calculates following Real Time Value(s):<br> - * <ul> - * <li>Quotes for IBM, Google, Apple, Yahoo stocks price/volume/time displayed - * every second.</li> - * <li>Charts for Stocks in terms for high/low price vs volume for last minute.</li> - * <li>Simple moving average over last 5 minutes for IBM, Google, Apple, Yahoo - * stocks.</li> - * </ul> - * <br> - * <br> - * - * Custom Attribute : <br> - * <ul> - * <li>Application streaming window size(STREAMING_WINDOW_SIZE_MILLIS) = 1 sec, - * since we are only interested in quotes every second.</li> - * <li>Range/Minute Volume operator's window size(APPLICATION_WINDOW_COUNT) = - * 60, aggregate over one minute.</li> - * <li>Sum operator window length : 300, sliding average over last 5 minutes.</li> - * </ul> - * <br> - * - * Input Adapter : <br> - * Stock Tick input operator get yahoo finance real time stock quotes data and - * pushes application. <br> - * <br> - * - * Output Adapter : <br> - * Output values are written to console through ConsoleOutputOerator<br> - * if you need to change write to HDFS,HTTP .. instead of console, <br> - * Please refer to {@link com.datatorrent.lib.io.HttpOutputOperator} or - * {@link com.datatorrent.lib.io.fs.HdfsOutputOperator}. <br> - * <br> - * - * Run Sample Application : <br> - * <p> - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see following output on - * console: - * - * <pre> - * Price SMA: AAPL=435.965 - * Price SMA: GOOG=877.0 - * QUOTE: {YHOO=[26.37, 9760360, 4:00pm, null, null], IBM=[203.77, 2899698, 4:00pm, null, null], GOOG=[877.0, 2069614, 4:00pm, null, null], AAPL=[435.965, 10208099, 4:00pm, null, null]} - * Price SMA: YHOO=26.37 - * </pre> - * - * Scaling Options : <br> - * <ul> - * <li>Volume operator can be replicated using a {@link StatelessPartitioner} - * on an operator.</li> - * <li>Range value operator can replicated but using proper unifier - * operator(read App Dev Guide).</li> - * <li>Slinging window operator can be replicated with proper unifier operator.</li> - * </ul> - * <br> - * - * Application DAG : <br> - * <img src="doc-files/Application.gif" width=600px > <br> - * <br> - * - * Streaming Window Size : 1000 ms(1 Sec) <br> - * Operator Details : <br> - * <ul> - * <li> - * <p> - * <b>The operator DailyVolume:</b> This operator reads from the input port, - * which contains the incremental volume tuples from StockTickInput, and - * aggregates the data to provide the cumulative volume. It just utilizes the - * library class SumKeyVal<K,V> provided in math package. In this case, - * SumKeyVal<String,Long>, where K is the stock symbol, V is the aggregated - * volume, with cumulative set to true. (Otherwise if cumulative was set to - * false, SumKeyVal would provide the sum for the application window.) The platform - * provides a number of built-in operators for simple operations like this so - * that application developers do not have to write them. More examples to - * follow. This operator assumes that the application restarts before market - * opens every day. - * </p> - * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br> - * Operator Application Window Count : 1 <br> - * StateFull : Yes, volume gets aggregated every window count.</li> - * - * <li> - * <p> - * <b>The operator MinuteVolume:</b> This operator reads from the input port, - * which contains the volume tuples from StockTickInput, and aggregates the data - * to provide the sum of the volume within one minute. Like the operator - * DailyVolume, this operator is also SumKeyVal<String,Long>, but with - * cumulative set to false. Application Window is set to 1 minute. We will - * explain how to set this later. <br> - * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br> - * Operator App Window Count : 60 (1 Minute) <br> - * StateFull : Yes, aggregate over last 60 windows.</li> - * - * <li> - * <p> - * <b>The operator Quote:</b> This operator has three input ports, which are - * price (from StockTickInput), daily_vol (from Daily Volume), and time (from - * StockTickInput). This operator just consolidates the three data and and emits - * the consolidated data. It utilizes the class ConsolidatorKeyVal<K> from - * stream package.<br> - * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br> - * Operator App Window Count : 1 <br> - * StateFull : No</li> - * - * <li> - * <p> - * <b>The operator Chart:</b> This operator is very similar to the operator - * Quote, except that it takes inputs from High Low and Minute Vol and outputs - * the consolidated tuples to the output port. <br> - * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br> - * StateFull : No<br> - * Operator App Window Count : 1</li> - * - * - * <li> - * <p> - * <b>The operator PriceSMA:</b> SMA stands for - Simple Moving Average. It - * reads from the input port, which contains the price tuples from - * StockTickInput, and provides the moving average price of the stock. It - * utilizes SimpleMovingAverage<String,Double>, which is provided in multiwindow - * package. SimpleMovingAverage keeps track of the data of the previous N - * application windows in a sliding manner. For each end window event, it - * provides the average of the data in those application windows. <br> - * Class : {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} <br> - * StateFull : Yes, stores values across application window. <br> - * Operator App Window : 1 <br> - * Operator Sliding Window : 300 (5 mins).</li> - * - * <li> - * <p> - * <b>The operator Console: </b> This operator just outputs the input tuples to - * the console (or stdout). In this example, there are four console operators, - * which connect to the output of Quote, Chart, PriceSMA and VolumeSMA. In - * practice, they should be replaced by operators which would do something about - * the data, like drawing charts. </li> - * - * </ul> - * <br> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "YahooFinanceDemo") -public class YahooFinanceApplication implements StreamingApplication -{ - protected int streamingWindowSizeMilliSeconds = 1000; // 1 second - protected int appWindowCountMinute = 60; // 1 minute - protected int appWindowCountSMA = 300; // 5 minute - //protected String[] tickers = {"IBM", "GOOG", "AAPL", "YHOO"}; - - /** - * Instantiate stock input operator for actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price. - * @param name Operator name - * @param dag Application DAG graph. - * @return StockTickInput instance. - */ - public StockTickInput getStockTickInputOperator(String name, DAG dag) - { - StockTickInput oper = dag.addOperator(name, StockTickInput.class); - oper.readIntervalMillis = 200; - //oper.symbols = tickers; - return oper; - } - - /** - * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator - * to sends total daily volume by adding volumes from each ticks. - * @param name Operator name - * @param dag Application DAG graph. - * @return SumKeyVal instance. - */ - public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag) - { - SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); - oper.setType(Long.class); - oper.setCumulative(true); - return oper; - } - - /** - * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator - * Get aggregated volume of 1 minute and send at the end window of 1 minute. - * @param name Operator name - * @param dag Application DAG graph. - * @param appWindowCount Operator aggregate window size. - * @return SumKeyVal instance. - */ - public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount) - { - SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); - oper.setType(Long.class); - dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount); - return oper; - } - - /** - * Instantiate {@link com.datatorrent.lib.math.RangeKeyVal} operator to get high/low - * value for each key within given application window. - * Get High-low range for 1 minute. - * @param name Operator name - * @param dag Application DAG graph. - * @param appWindowCount Operator aggregate window size. - * @return RangeKeyVal instance. - */ - public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount) - { - RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>()); - dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount); - oper.setType(Double.class); - return oper; - } - - /** - * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send - * Quote (Merge price, daily volume, time) - * @param name Operator name - * @param dag Application DAG graph. - * @return ConsolidatorKeyVal instance. - */ - public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag) - { - ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>()); - return oper; - } - - /** - * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send - * Chart (Merge minute volume and minute high-low) - * @param name Operator name - * @param dag Application DAG graph. - * @return ConsolidatorKeyVal instance. - */ - public ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> getChartOperator(String name, DAG dag) - { - ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow<Double>,Long,Object,Object,Object>()); - return oper; - } - - /** - * Instantiate {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} to calculate moving average for price - * over given window size. Sliding window size is 1. - * @param name Operator name - * @param dag Application DAG graph. - * @param appWindowCount Operator aggregate window size. - * @return SimpleMovingAverage instance. - */ - public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount) - { - SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>()); - oper.setWindowSize(appWindowCount); - oper.setType(Double.class); - return oper; - } - - /** - * Get console for output operator. - * @param name Operator name - * @param dag Application DAG graph. - * @return input port for console output. - */ - public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix) - { - // hack to output to HTTP based on actual environment - /* - String serverAddr = System.getenv("MALHAR_AJAXSERVER_ADDRESS"); - if (serverAddr != null) { - HttpOutputOperator<Object> oper = dag.addOperator(name, new HttpOutputOperator<Object>()); - oper.setResourceURL(URI.create("http://" + serverAddr + "/channel/" + nodeName)); - return oper.input; - } - */ - - ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class); - oper.setStringFormat(prefix + ": %s"); - return oper.input; - } - - /** - * Populate Yahoo Finance Demo Application DAG. - */ - @SuppressWarnings("unchecked") - @Override - public void populateDAG(DAG dag, Configuration conf) - { - - dag.getAttributes().put(DAG.STREAMING_WINDOW_SIZE_MILLIS, streamingWindowSizeMilliSeconds); - - StockTickInput tick = getStockTickInputOperator("StockTickInput", dag); - SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag); - ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag); - - RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute); - SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute); - ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> chartOperator = getChartOperator("Chart", dag); - - SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA); - DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>(); - dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec); - dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec); - dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data); - dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data); - dag.addStream("time", tick.time, quoteOperator.in3); - dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2); - - dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE")); - - dag.addStream("high_low", highlow.range, chartOperator.in1); - dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2); - dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART")); - - dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA")); - - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java deleted file mode 100644 index cf3801e..0000000 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.yahoofinance; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.httpclient.HttpClient; -import org.apache.commons.httpclient.HttpStatus; -import org.apache.commons.httpclient.cookie.CookiePolicy; -import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.commons.httpclient.params.DefaultHttpParams; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.io.SimpleSinglePortInputOperator; - -import au.com.bytecode.opencsv.CSVReader; - -/** - * Grabs Yahoo Finance quotes data and emits HashMap, with key equals the format name (e.g. "s0") <p> - * - * @since 0.3.2 - */ -public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator<HashMap<String, Object>> implements Runnable -{ - private static final Logger logger = LoggerFactory.getLogger(YahooFinanceCSVInputOperator.class); - /** - * Timeout interval for reading from server. 0 or negative indicates no timeout. - */ - private int readIntervalMillis = 500; - - /** - * The URL of the web service resource for the POST request. - */ - private String url; - private transient HttpClient client; - private transient GetMethod method; - - private ArrayList<String> symbolList = new ArrayList<String>(); - private ArrayList<String> parameterList = new ArrayList<String>(); - - public void addSymbol(String symbol) - { - symbolList.add(symbol); - } - - public void addFormat(String format) - { - parameterList.add(format); - } - - public ArrayList<String> getSymbolList() - { - return symbolList; - } - - public ArrayList<String> getParameterList() - { - return parameterList; - } - - public int getReadIntervalMillis() - { - return readIntervalMillis; - } - - public void setReadIntervalMillis(int readIntervalMillis) - { - this.readIntervalMillis = readIntervalMillis; - } - - /** - * Prepare URL from symbols and parameters. - * URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=GOOG,FB,YHOO&f=sl1vt1&e=.csv - * @return - */ - private String prepareURL() - { - String str = "http://download.finance.yahoo.com/d/quotes.csv?"; - - str += "s="; - for (int i = 0; i < symbolList.size(); i++) { - if (i != 0) { - str += ","; - } - str += symbolList.get(i); - } - str += "&f="; - for (String format: parameterList) { - str += format; - } - str += "&e=.csv"; - return str; - } - - @Override - public void setup(OperatorContext context) - { - url = prepareURL(); - client = new HttpClient(); - method = new GetMethod(url); - DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); - } - - @Override - public void run() - { - while (true) { - try { - int statusCode = client.executeMethod(method); - if (statusCode != HttpStatus.SC_OK) { - logger.error("Method failed: " + method.getStatusLine()); - } else { - InputStream istream; - istream = method.getResponseBodyAsStream(); - // Process response - InputStreamReader isr = new InputStreamReader(istream); - CSVReader reader = new CSVReader(isr); - List<String[]> myEntries; - myEntries = reader.readAll(); - for (String[] stringArr: myEntries) { - HashMap<String,Object> hm = new HashMap<String,Object>(); - for (int i = 0; i < parameterList.size(); i++) { - hm.put(parameterList.get(i), stringArr[i]); - } - outputPort.emit(hm); // send out one symbol at a time - } - } - Thread.sleep(readIntervalMillis); - } catch (InterruptedException ex) { - logger.debug(ex.toString()); - } catch (IOException ex) { - logger.debug(ex.toString()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif deleted file mode 100644 index 8fc8331..0000000 Binary files a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/doc-files/Application.gif and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java ---------------------------------------------------------------------- diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java deleted file mode 100644 index 992bc30..0000000 --- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Yahoo Finance demonstration applications. - */ -package com.datatorrent.demos.yahoofinance;
