Repository: apex-malhar Updated Branches: refs/heads/master 4b36bf3e5 -> d5bf96cac
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/LineReader.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/LineReader.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/LineReader.java new file mode 100644 index 0000000..25336ae --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/LineReader.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +/** + * Reads lines from input file and returns them. If EOF is reached, a control tuple + * is emitted on the control port + * + * @since 3.2.0 + */ +public class LineReader extends AbstractFileInputOperator<String> +{ + private static final Logger LOG = LoggerFactory.getLogger(LineReader.class); + + /** + * Output port on which lines from current file name are emitted + */ + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + /** + * Control port on which the current file name is emitted to indicate EOF + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>(); + + private transient BufferedReader br = null; + + private Path path; + + /** + * File open callback; wrap the file input stream in a buffered reader for reading lines + * @param curPath The path to the file just opened + */ + @Override + protected InputStream openFile(Path curPath) throws IOException + { + LOG.info("openFile: curPath = {}", curPath); + path = curPath; + InputStream is = super.openFile(path); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + /** + * File close callback; close buffered reader + * @param is File input stream that will imminently be closed + */ + @Override + protected void closeFile(InputStream is) throws IOException + { + super.closeFile(is); + br.close(); + br = null; + path = null; + } + + /** + * {@inheritDoc} + * If we hit EOF, emit file name on control port + */ + @Override + protected String readEntity() throws IOException + { + // try to read a line + final String line = br.readLine(); + if (null != line) { // common case + LOG.debug("readEntity: line = {}", line); + return line; + } + + // end-of-file; send control tuple, containing only the last component of the path + // (only file name) on control port + // + if (control.isConnected()) { + LOG.info("readEntity: EOF for {}", path); + final String name = path.getName(); // final component of path + control.emit(name); + } + + return null; + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WCPair.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WCPair.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WCPair.java new file mode 100644 index 0000000..a960695 --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WCPair.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +/** + * A single (word, frequency) pair + * + * @since 3.2.0 + */ +public class WCPair +{ + /** + * The word + */ + public String word; + + /** + * The frequency + */ + public int freq; + + /** + * Default constructor + */ + public WCPair() + { + + } + + /** + * Create new object with given values + * @param w The word + * @param f The frequency + */ + public WCPair(String w, int f) + { + word = w; + freq = f; + } + + @Override + public String toString() + { + return String.format("(%s, %d)", word, freq); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WindowWordCount.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WindowWordCount.java new file mode 100644 index 0000000..09c3921 --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WindowWordCount.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * Computes word frequencies per window and emits them at each {@code endWindow()}. The output is a + * list of (word, frequency) pairs + * + * @since 3.2.0 + */ +public class WindowWordCount extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class); + + /** {@literal (word => frequency)} map for current window */ + protected Map<String, WCPair> wordMap = new HashMap<>(); + + /** + * Input port on which words are received + */ + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + @Override + public void process(String word) + { + WCPair pair = wordMap.get(word); + if (null != pair) { // word seen previously + pair.freq += 1; + return; + } + + // new word + pair = new WCPair(); + pair.word = word; + pair.freq = 1; + wordMap.put(word, pair); + } + }; + + /** + * Output port which emits the list of word frequencies for current window + */ + public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>(); + + /** + * {@inheritDoc} + * If we've seen some words in this window, emit the map and clear it for next window + */ + @Override + public void endWindow() + { + LOG.info("WindowWordCount: endWindow"); + + // got EOF; if no words found, do nothing + if (wordMap.isEmpty()) { + return; + } + + // have some words; emit single map and reset for next file + final ArrayList<WCPair> list = new ArrayList<>(wordMap.values()); + output.emit(list); + list.clear(); + wordMap.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountInputOperator.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountInputOperator.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountInputOperator.java new file mode 100644 index 0000000..d5fc66a --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountInputOperator.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.io.SimpleSinglePortInputOperator; + +/** + * <p>WordCountInputOperator class.</p> + * + * @since 0.3.2 + */ +public class WordCountInputOperator extends SimpleSinglePortInputOperator<String> implements Runnable +{ + + private static final Logger logger = LoggerFactory.getLogger(WordCountInputOperator.class); + protected long averageSleep = 300; + protected long sleepPlusMinus = 100; + protected String fileName = "com/datatorrent/examples/wordcount/samplefile.txt"; + + public void setAverageSleep(long as) + { + averageSleep = as; + } + + public void setSleepPlusMinus(long spm) + { + sleepPlusMinus = spm; + } + + public void setFileName(String fn) + { + fileName = fn; + } + + @Override + public void run() + { + BufferedReader br = null; + DataInputStream in = null; + InputStream fstream = null; + + while (true) { + try { + String line; + fstream = this.getClass().getClassLoader().getResourceAsStream(fileName); + + in = new DataInputStream(fstream); + br = new BufferedReader(new InputStreamReader(in)); + + while ((line = br.readLine()) != null) { + String[] words = line.trim().split("[\\p{Punct}\\s\\\"\\'ââ]+"); + for (String word : words) { + word = word.trim().toLowerCase(); + if (!word.isEmpty()) { + outputPort.emit(word); + } + } + try { + Thread.sleep(averageSleep + (new Double(sleepPlusMinus * (Math.random() * 2 - 1))).longValue()); + } catch (InterruptedException ex) { + // nothing + } + } + + } catch (IOException ex) { + logger.debug(ex.toString()); + } finally { + try { + if (br != null) { + br.close(); + } + if (in != null) { + in.close(); + } + if (fstream != null) { + fstream.close(); + } + } catch (IOException exc) { + // nothing + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountWriter.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountWriter.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountWriter.java new file mode 100644 index 0000000..e4ee35e --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordCountWriter.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import java.io.UnsupportedEncodingException; + +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Write top N words and their frequencies to a file + * + * @since 3.2.0 + */ +public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>> +{ + private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class); + private static final String charsetName = "UTF-8"; + private static final String nl = System.lineSeparator(); + + private String fileName; // current file name + private final transient StringBuilder sb = new StringBuilder(); + + /** + * {@inheritDoc} + * Invoke requestFinalize() to create the output file with the desired name without decorations. + */ + @Override + public void endWindow() + { + if (null != fileName) { + requestFinalize(fileName); + } + super.endWindow(); + } + + /** + * Extracts file name from argument + * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs} + * @return the file name to write the tuple to + */ + @Override + protected String getFileName(Map<String, Object> tuple) + { + LOG.info("getFileName: tuple.size = {}", tuple.size()); + + final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next(); + fileName = entry.getKey(); + LOG.info("getFileName: fileName = {}", fileName); + return fileName; + } + + /** + * Extracts output file content from argument + * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs} + * @return input tuple converted to an array of bytes + */ + @Override + protected byte[] getBytesForTuple(Map<String, Object> tuple) + { + LOG.info("getBytesForTuple: tuple.size = {}", tuple.size()); + + // get first and only pair; key is the fileName and is ignored here + final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next(); + final List<WCPair> list = (List<WCPair>)entry.getValue(); + + if (sb.length() > 0) { // clear buffer + sb.delete(0, sb.length()); + } + + for ( WCPair pair : list ) { + sb.append(pair.word); + sb.append(" : "); + sb.append(pair.freq); + sb.append(nl); + } + + final String data = sb.toString(); + LOG.info("getBytesForTuple: data = {}", data); + try { + final byte[] result = data.getBytes(charsetName); + return result; + } catch (UnsupportedEncodingException ex) { + throw new RuntimeException("Should never get here", ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordReader.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordReader.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordReader.java new file mode 100644 index 0000000..663d399 --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/WordReader.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import java.util.regex.Pattern; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * Extracts words from input line + * + * @since 3.3.0 + */ +public class WordReader extends BaseOperator +{ + // default pattern for word-separators + private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+"); + + private String nonWordStr; // configurable regex + private transient Pattern nonWord; // compiled regex + + /** + * Output port on which words from the current file are emitted + */ + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + /** + * Input port on which lines from the current file are received + */ + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + + @Override + public void process(String line) + { + // line; split it into words and emit them + final String[] words = nonWord.split(line); + for (String word : words) { + if (word.isEmpty()) { + continue; + } + output.emit(word); + } + } + }; + + /** + * Returns the regular expression that matches strings between words + * @return Regular expression for strings that separate words + */ + public String getNonWordStr() + { + return nonWordStr; + } + + /** + * Sets the regular expression that matches strings between words + * @param regex New regular expression for strings that separate words + */ + public void setNonWordStr(String regex) + { + nonWordStr = regex; + } + + /** + * {@inheritDoc} + * Set nonWord to the default pattern if necessary + */ + @Override + public void setup(OperatorContext context) + { + if (null == nonWordStr) { + nonWord = nonWordDefault; + } else { + nonWord = Pattern.compile(nonWordStr); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/doc-files/UniqueWordCounter.jpg ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/doc-files/UniqueWordCounter.jpg b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/doc-files/UniqueWordCounter.jpg new file mode 100644 index 0000000..054baed Binary files /dev/null and b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/doc-files/UniqueWordCounter.jpg differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/package-info.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/package-info.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/package-info.java new file mode 100644 index 0000000..c394d0e --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Streaming word count demonstration application. + */ +package org.apache.apex.examples.wordcount; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/resources/META-INF/properties.xml b/examples/wordcount/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..3d9e356 --- /dev/null +++ b/examples/wordcount/src/main/resources/META-INF/properties.xml @@ -0,0 +1,98 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- TopNWordsWithQueries --> + + <!-- for debugging --> + <!-- + <property> + <name>dt.attr.CONTAINER_JVM_OPTIONS</name> + <value>-Dlog4j.configuration=my_log4j.properties</value> + </property> + --> + + <!-- monitored input directory --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.lineReader.directory</name> + <value>/tmp/test/input-dir</value> + </property> + + <!-- regular expression for word separator --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wordReader.nonWordStr</name> + <value>[\p{Punct}\s]+</value> + </property> + + <!-- output directory for word counts --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wcWriter.filePath</name> + <value>/tmp/test/output-dir</value> + </property> + + <!-- Top N value --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.fileWordCount.topN</name> + <value>10</value> + </property> + + <!-- topic for queries (current file) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.snapshotServerFile.embeddableQueryInfoProvider.topic</name> + <value>TopNWordsQueryFile</value> + </property> + + <!-- topic for query results (current file) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wsResultFile.topic</name> + <value>TopNWordsQueryFileResult</value> + </property> + + <!-- topic for queries (global) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.snapshotServerGlobal.embeddableQueryInfoProvider.topic</name> + <value>TopNWordsQueryGlobal</value> + </property> + + <!-- topic for query results (global) --> + <property> + <name>dt.application.TopNWordsWithQueries.operator.wsResultGlobal.topic</name> + <value>TopNWordsQueryGlobalResult</value> + </property> + + <!-- retry count --> + <property> + <name>dt.application.TwitterExample.operator.wsResult.numRetries</name> + <value>2147483647</value> + </property> + + + <!-- WordCountExample --> + <property> + <name>dt.application.WordCountExample.operator.wordinput.fileName</name> + <value>samplefile.txt</value> + </property> + <property> + <name>dt.application.WordCountExample.stream.wordinput.count.locality</name> + <value>CONTAINER_LOCAL</value> + <description>Specify container locality for the viewtuplecount stream + </description> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/resources/WordDataSchema.json ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/resources/WordDataSchema.json b/examples/wordcount/src/main/resources/WordDataSchema.json new file mode 100644 index 0000000..5e8e7c0 --- /dev/null +++ b/examples/wordcount/src/main/resources/WordDataSchema.json @@ -0,0 +1,4 @@ +{ + "values": [{"name": "word", "type": "string"}, + {"name": "count", "type": "integer"}] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/resources/org/apache/apex/examples/wordcount/samplefile.txt ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/resources/org/apache/apex/examples/wordcount/samplefile.txt b/examples/wordcount/src/main/resources/org/apache/apex/examples/wordcount/samplefile.txt new file mode 100644 index 0000000..83eaaed --- /dev/null +++ b/examples/wordcount/src/main/resources/org/apache/apex/examples/wordcount/samplefile.txt @@ -0,0 +1 @@ +CONTENT DELETED http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/resources/samplefile.txt ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/resources/samplefile.txt b/examples/wordcount/src/main/resources/samplefile.txt new file mode 100644 index 0000000..02a5e70 --- /dev/null +++ b/examples/wordcount/src/main/resources/samplefile.txt @@ -0,0 +1,2 @@ +CONTENT DELETED + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/test/java/org/apache/apex/examples/wordcount/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/test/java/org/apache/apex/examples/wordcount/ApplicationTest.java b/examples/wordcount/src/test/java/org/apache/apex/examples/wordcount/ApplicationTest.java new file mode 100644 index 0000000..f245fb8 --- /dev/null +++ b/examples/wordcount/src/test/java/org/apache/apex/examples/wordcount/ApplicationTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +/** + * + */ +public class ApplicationTest +{ + private final transient Logger LOG = LoggerFactory.getLogger(ApplicationTest.class); + public ApplicationTest() + { + } + + @Test + public void testSomeMethod() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-wordcount.xml"); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + long start = System.currentTimeMillis(); + lc.run(300000); + long end = System.currentTimeMillis(); + long time = end - start; + LOG.debug("Test used " + time + " ms"); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/test/resources/dt-site-wordcount.xml ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/test/resources/dt-site-wordcount.xml b/examples/wordcount/src/test/resources/dt-site-wordcount.xml new file mode 100644 index 0000000..b7373e8 --- /dev/null +++ b/examples/wordcount/src/test/resources/dt-site-wordcount.xml @@ -0,0 +1,37 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.WordCountExample.class</name> + <value>org.apache.apex.examples.wordcount.Application</value> + <description>An alias for the application</description> + </property> + <property> + <name>dt.application.WordCountExample.operator.wordinput.fileName</name> + <value>samplefile.txt</value> + </property> + <property> + <name>dt.application.WordCountExample.stream.wordinput.count.locality</name> + <value>CONTAINER_LOCAL</value> + <description>Specify container locality for the viewtuplecount stream + </description> + </property> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/test/resources/log4j.properties b/examples/wordcount/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/wordcount/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/pom.xml ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/pom.xml b/examples/yahoofinance/pom.xml new file mode 100644 index 0000000..c607198 --- /dev/null +++ b/examples/yahoofinance/pom.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-yahoo-finance</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar Yahoo! Finance Example</name> + <description>Apex example applications that get Yahoo finance feed and calculate minute price range, minute volume and simple moving average.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <groupId>net.sf.opencsv</groupId> + <artifactId>opencsv</artifactId> + <version>2.0</version> + </dependency> + <dependency> + <groupId>org.apache.derby</groupId> + <artifactId>derby</artifactId> + <version>10.9.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/assemble/appPackage.xml b/examples/yahoofinance/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/yahoofinance/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQL.java ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQL.java b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQL.java new file mode 100644 index 0000000..517ef68 --- /dev/null +++ b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQL.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.yahoofinance; + +import org.apache.apex.malhar.contrib.misc.streamquery.AbstractSqlStreamOperator; +import org.apache.apex.malhar.contrib.misc.streamquery.DerbySqlStreamOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * This example will output the stock market data from yahoo finance + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "YahooFinanceWithDerbySQLExample") +public class ApplicationWithDerbySQL implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + ".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM"); + + String[] symbols = symbolStr.split(","); + + YahooFinanceCSVInputOperator input1 = dag.addOperator("input1", new YahooFinanceCSVInputOperator()); + YahooFinanceCSVInputOperator input2 = dag.addOperator("input2", new YahooFinanceCSVInputOperator()); + DerbySqlStreamOperator sqlOper = dag.addOperator("sqlOper", new DerbySqlStreamOperator()); + ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator()); + + for (String symbol : symbols) { + input1.addSymbol(symbol); + input2.addSymbol(symbol); + } + input1.addFormat("s0"); + input1.addFormat("l1"); + input2.addFormat("s0"); + input2.addFormat("e0"); + input2.addFormat("b4"); + + AbstractSqlStreamOperator.InputSchema inputSchema1 = new AbstractSqlStreamOperator.InputSchema("t1"); + AbstractSqlStreamOperator.InputSchema inputSchema2 = new AbstractSqlStreamOperator.InputSchema("t2"); + inputSchema1.setColumnInfo("s0", "varchar(100)", true); // symbol + inputSchema1.setColumnInfo("l1", "float", false); // last trade + inputSchema2.setColumnInfo("s0", "varchar(100)", true); // symbol + inputSchema2.setColumnInfo("e0", "float", false); // EPS + inputSchema2.setColumnInfo("b4", "float", false); // Book value + + sqlOper.setInputSchema(0, inputSchema1); + sqlOper.setInputSchema(1, inputSchema2); + + // Calculate PE Ratio and PB Ratio using SQL + sqlOper.addExecStatementString("SELECT SESSION.t1.s0 AS symbol, SESSION.t1.l1 / SESSION.t2.e0 AS pe_ratio, SESSION.t1.l1 / SESSION.t2.b4 AS pb_ratio FROM SESSION.t1,SESSION.t2 WHERE SESSION.t1.s0 = SESSION.t2.s0"); + + dag.addStream("input1_sql", input1.outputPort, sqlOper.in1); + dag.addStream("input2_sql", input2.outputPort, sqlOper.in2); + + dag.addStream("result_console", sqlOper.result, consoleOperator.input); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/StockTickInput.java ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/StockTickInput.java b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/StockTickInput.java new file mode 100644 index 0000000..cd31e03 --- /dev/null +++ b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/StockTickInput.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.yahoofinance; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.cookie.CookiePolicy; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.params.DefaultHttpParams; +import org.apache.hadoop.util.StringUtils; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.KeyValPair; + +import au.com.bytecode.opencsv.CSVReader; + +/** + * This operator sends price, volume and time into separate ports and calculates incremental volume. + * + * @since 0.3.2 + */ +public class StockTickInput implements InputOperator +{ + private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class); + /** + * Timeout interval for reading from server. 0 or negative indicates no timeout. + */ + public int readIntervalMillis = 500; + /** + * The URL of the web service resource for the POST request. + */ + private String url; + private String[] symbols; + @NotNull + private String tickers; + private transient HttpClient client; + private transient GetMethod method; + private HashMap<String, Long> lastVolume = new HashMap<String, Long>(); + private boolean outputEvenIfZeroVolume = false; + /** + * The output port to emit price. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<String, Double>> price = new DefaultOutputPort<KeyValPair<String, Double>>(); + /** + * The output port to emit incremental volume. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<String, Long>> volume = new DefaultOutputPort<KeyValPair<String, Long>>(); + /** + * The output port to emit last traded time. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<String, String>> time = new DefaultOutputPort<KeyValPair<String, String>>(); + + /** + * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&f=sl1vt1 + * + * @return the URL + */ + private String prepareURL() + { + String str = "http://download.finance.yahoo.com/d/quotes.csv?s="; + for (int i = 0; i < symbols.length; i++) { + if (i != 0) { + str += ","; + } + str += symbols[i]; + } + str += "&f=sl1vt1&e=.csv"; + return str; + } + + @Override + public void setup(OperatorContext context) + { + url = prepareURL(); + client = new HttpClient(); + method = new GetMethod(url); + DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); + } + + @Override + public void teardown() + { + } + + @Override + public void emitTuples() + { + + try { + int statusCode = client.executeMethod(method); + if (statusCode != HttpStatus.SC_OK) { + logger.error("Method failed: " + method.getStatusLine()); + } else { + InputStream istream = method.getResponseBodyAsStream(); + // Process response + InputStreamReader isr = new InputStreamReader(istream); + CSVReader reader = new CSVReader(isr); + List<String[]> myEntries = reader.readAll(); + for (String[] stringArr: myEntries) { + ArrayList<String> tuple = new ArrayList<String>(Arrays.asList(stringArr)); + if (tuple.size() != 4) { + return; + } + // input csv is <Symbol>,<Price>,<Volume>,<Time> + String symbol = tuple.get(0); + double currentPrice = Double.valueOf(tuple.get(1)); + long currentVolume = Long.valueOf(tuple.get(2)); + String timeStamp = tuple.get(3); + long vol = currentVolume; + // Sends total volume in first tick, and incremental volume afterwards. + if (lastVolume.containsKey(symbol)) { + vol -= lastVolume.get(symbol); + } + + if (vol > 0 || outputEvenIfZeroVolume) { + price.emit(new KeyValPair<String, Double>(symbol, currentPrice)); + volume.emit(new KeyValPair<String, Long>(symbol, vol)); + time.emit(new KeyValPair<String, String>(symbol, timeStamp)); + lastVolume.put(symbol, currentVolume); + } + } + } + Thread.sleep(readIntervalMillis); + } catch (InterruptedException ex) { + logger.debug(ex.toString()); + } catch (IOException ex) { + logger.debug(ex.toString()); + } + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + + public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume) + { + this.outputEvenIfZeroVolume = outputEvenIfZeroVolume; + } + + public void setTickers(String tickers) + { + this.tickers = tickers; + symbols = StringUtils.split(tickers, ','); + } + + public String getTickers() + { + return tickers; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceApplication.java ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceApplication.java b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceApplication.java new file mode 100644 index 0000000..4cc323e --- /dev/null +++ b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceApplication.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.yahoofinance; + +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.math.RangeKeyVal; +import com.datatorrent.lib.math.SumKeyVal; +import com.datatorrent.lib.multiwindow.SimpleMovingAverage; +import com.datatorrent.lib.stream.ConsolidatorKeyVal; +import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec; +import com.datatorrent.lib.util.HighLow; + +/** + * Yahoo! Finance Application Example :<br> + * Get Yahoo finance feed and calculate minute price range, minute volume, + * simple moving average of 5 minutes. <br> + * <br> + * Functional Description : <br> + * Application samples yahoo finance ticker every 200ms. All data points in one + * second are streamed from input adapter. <br> + * <br> + * + * Application calculates following Real Time Value(s):<br> + * <ul> + * <li>Quotes for IBM, Google, Apple, Yahoo stocks price/volume/time displayed + * every second.</li> + * <li>Charts for Stocks in terms for high/low price vs volume for last minute.</li> + * <li>Simple moving average over last 5 minutes for IBM, Google, Apple, Yahoo + * stocks.</li> + * </ul> + * <br> + * <br> + * + * Custom Attribute : <br> + * <ul> + * <li>Application streaming window size(STREAMING_WINDOW_SIZE_MILLIS) = 1 sec, + * since we are only interested in quotes every second.</li> + * <li>Range/Minute Volume operator's window size(APPLICATION_WINDOW_COUNT) = + * 60, aggregate over one minute.</li> + * <li>Sum operator window length : 300, sliding average over last 5 minutes.</li> + * </ul> + * <br> + * + * Input Adapter : <br> + * Stock Tick input operator get yahoo finance real time stock quotes data and + * pushes application. <br> + * <br> + * + * Output Adapter : <br> + * Output values are written to console through ConsoleOutputOerator<br> + * if you need to change write to HDFS,HTTP .. instead of console, <br> + * Please refer to {@link com.datatorrent.lib.io.HttpOutputOperator} or + * {@link com.datatorrent.lib.io.fs.HdfsOutputOperator}. <br> + * <br> + * + * Run Sample Application : <br> + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see following output on + * console: + * + * <pre> + * Price SMA: AAPL=435.965 + * Price SMA: GOOG=877.0 + * QUOTE: {YHOO=[26.37, 9760360, 4:00pm, null, null], IBM=[203.77, 2899698, 4:00pm, null, null], GOOG=[877.0, 2069614, 4:00pm, null, null], AAPL=[435.965, 10208099, 4:00pm, null, null]} + * Price SMA: YHOO=26.37 + * </pre> + * + * Scaling Options : <br> + * <ul> + * <li>Volume operator can be replicated using a {@link StatelessPartitioner} + * on an operator.</li> + * <li>Range value operator can replicated but using proper unifier + * operator(read App Dev Guide).</li> + * <li>Slinging window operator can be replicated with proper unifier operator.</li> + * </ul> + * <br> + * + * Application DAG : <br> + * <img src="doc-files/Application.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 1000 ms(1 Sec) <br> + * Operator Details : <br> + * <ul> + * <li> + * <p> + * <b>The operator DailyVolume:</b> This operator reads from the input port, + * which contains the incremental volume tuples from StockTickInput, and + * aggregates the data to provide the cumulative volume. It just utilizes the + * library class SumKeyVal<K,V> provided in math package. In this case, + * SumKeyVal<String,Long>, where K is the stock symbol, V is the aggregated + * volume, with cumulative set to true. (Otherwise if cumulative was set to + * false, SumKeyVal would provide the sum for the application window.) The platform + * provides a number of built-in operators for simple operations like this so + * that application developers do not have to write them. More examples to + * follow. This operator assumes that the application restarts before market + * opens every day. + * </p> + * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br> + * Operator Application Window Count : 1 <br> + * StateFull : Yes, volume gets aggregated every window count.</li> + * + * <li> + * <p> + * <b>The operator MinuteVolume:</b> This operator reads from the input port, + * which contains the volume tuples from StockTickInput, and aggregates the data + * to provide the sum of the volume within one minute. Like the operator + * DailyVolume, this operator is also SumKeyVal<String,Long>, but with + * cumulative set to false. Application Window is set to 1 minute. We will + * explain how to set this later. <br> + * Class : {@link com.datatorrent.lib.math.SumKeyVal} <br> + * Operator App Window Count : 60 (1 Minute) <br> + * StateFull : Yes, aggregate over last 60 windows.</li> + * + * <li> + * <p> + * <b>The operator Quote:</b> This operator has three input ports, which are + * price (from StockTickInput), daily_vol (from Daily Volume), and time (from + * StockTickInput). This operator just consolidates the three data and and emits + * the consolidated data. It utilizes the class ConsolidatorKeyVal<K> from + * stream package.<br> + * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br> + * Operator App Window Count : 1 <br> + * StateFull : No</li> + * + * <li> + * <p> + * <b>The operator Chart:</b> This operator is very similar to the operator + * Quote, except that it takes inputs from High Low and Minute Vol and outputs + * the consolidated tuples to the output port. <br> + * Class : {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} <br> + * StateFull : No<br> + * Operator App Window Count : 1</li> + * + * + * <li> + * <p> + * <b>The operator PriceSMA:</b> SMA stands for - Simple Moving Average. It + * reads from the input port, which contains the price tuples from + * StockTickInput, and provides the moving average price of the stock. It + * utilizes SimpleMovingAverage<String,Double>, which is provided in multiwindow + * package. SimpleMovingAverage keeps track of the data of the previous N + * application windows in a sliding manner. For each end window event, it + * provides the average of the data in those application windows. <br> + * Class : {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} <br> + * StateFull : Yes, stores values across application window. <br> + * Operator App Window : 1 <br> + * Operator Sliding Window : 300 (5 mins).</li> + * + * <li> + * <p> + * <b>The operator Console: </b> This operator just outputs the input tuples to + * the console (or stdout). In this example, there are four console operators, + * which connect to the output of Quote, Chart, PriceSMA and VolumeSMA. In + * practice, they should be replaced by operators which would do something about + * the data, like drawing charts. </li> + * + * </ul> + * <br> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "YahooFinanceExample") +public class YahooFinanceApplication implements StreamingApplication +{ + protected int streamingWindowSizeMilliSeconds = 1000; // 1 second + protected int appWindowCountMinute = 60; // 1 minute + protected int appWindowCountSMA = 300; // 5 minute + //protected String[] tickers = {"IBM", "GOOG", "AAPL", "YHOO"}; + + /** + * Instantiate stock input operator for actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price. + * @param name Operator name + * @param dag Application DAG graph. + * @return StockTickInput instance. + */ + public StockTickInput getStockTickInputOperator(String name, DAG dag) + { + StockTickInput oper = dag.addOperator(name, StockTickInput.class); + oper.readIntervalMillis = 200; + //oper.symbols = tickers; + return oper; + } + + /** + * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator + * to sends total daily volume by adding volumes from each ticks. + * @param name Operator name + * @param dag Application DAG graph. + * @return SumKeyVal instance. + */ + public SumKeyVal<String, Long> getDailyVolumeOperator(String name, DAG dag) + { + SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); + oper.setType(Long.class); + oper.setCumulative(true); + return oper; + } + + /** + * Instantiate {@link com.datatorrent.lib.math.SumKeyVal} operator + * Get aggregated volume of 1 minute and send at the end window of 1 minute. + * @param name Operator name + * @param dag Application DAG graph. + * @param appWindowCount Operator aggregate window size. + * @return SumKeyVal instance. + */ + public SumKeyVal<String, Long> getMinuteVolumeOperator(String name, DAG dag, int appWindowCount) + { + SumKeyVal<String, Long> oper = dag.addOperator(name, new SumKeyVal<String, Long>()); + oper.setType(Long.class); + dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount); + return oper; + } + + /** + * Instantiate {@link com.datatorrent.lib.math.RangeKeyVal} operator to get high/low + * value for each key within given application window. + * Get High-low range for 1 minute. + * @param name Operator name + * @param dag Application DAG graph. + * @param appWindowCount Operator aggregate window size. + * @return RangeKeyVal instance. + */ + public RangeKeyVal<String, Double> getHighLowOperator(String name, DAG dag, int appWindowCount) + { + RangeKeyVal<String, Double> oper = dag.addOperator(name, new RangeKeyVal<String, Double>()); + dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, appWindowCount); + oper.setType(Double.class); + return oper; + } + + /** + * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send + * Quote (Merge price, daily volume, time) + * @param name Operator name + * @param dag Application DAG graph. + * @return ConsolidatorKeyVal instance. + */ + public ConsolidatorKeyVal<String,Double,Long,String,?,?> getQuoteOperator(String name, DAG dag) + { + ConsolidatorKeyVal<String,Double,Long,String,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,Double,Long,String,Object,Object>()); + return oper; + } + + /** + * Instantiate {@link com.datatorrent.lib.stream.ConsolidatorKeyVal} to send + * Chart (Merge minute volume and minute high-low) + * @param name Operator name + * @param dag Application DAG graph. + * @return ConsolidatorKeyVal instance. + */ + public ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> getChartOperator(String name, DAG dag) + { + ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> oper = dag.addOperator(name, new ConsolidatorKeyVal<String,HighLow<Double>,Long,Object,Object,Object>()); + return oper; + } + + /** + * Instantiate {@link com.datatorrent.lib.multiwindow.SimpleMovingAverage} to calculate moving average for price + * over given window size. Sliding window size is 1. + * @param name Operator name + * @param dag Application DAG graph. + * @param appWindowCount Operator aggregate window size. + * @return SimpleMovingAverage instance. + */ + public SimpleMovingAverage<String, Double> getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount) + { + SimpleMovingAverage<String, Double> oper = dag.addOperator(name, new SimpleMovingAverage<String, Double>()); + oper.setWindowSize(appWindowCount); + oper.setType(Double.class); + return oper; + } + + /** + * Get console for output operator. + * @param name Operator name + * @param dag Application DAG graph. + * @return input port for console output. + */ + public InputPort<Object> getConsole(String name, /*String nodeName,*/ DAG dag, String prefix) + { + // hack to output to HTTP based on actual environment + /* + String serverAddr = System.getenv("MALHAR_AJAXSERVER_ADDRESS"); + if (serverAddr != null) { + HttpOutputOperator<Object> oper = dag.addOperator(name, new HttpOutputOperator<Object>()); + oper.setResourceURL(URI.create("http://" + serverAddr + "/channel/" + nodeName)); + return oper.input; + } + */ + + ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class); + oper.setStringFormat(prefix + ": %s"); + return oper.input; + } + + /** + * Populate Yahoo Finance Example Application DAG. + */ + @SuppressWarnings("unchecked") + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + dag.getAttributes().put(DAG.STREAMING_WINDOW_SIZE_MILLIS, streamingWindowSizeMilliSeconds); + + StockTickInput tick = getStockTickInputOperator("StockTickInput", dag); + SumKeyVal<String, Long> dailyVolume = getDailyVolumeOperator("DailyVolume", dag); + ConsolidatorKeyVal<String,Double,Long,String,?,?> quoteOperator = getQuoteOperator("Quote", dag); + + RangeKeyVal<String, Double> highlow = getHighLowOperator("HighLow", dag, appWindowCountMinute); + SumKeyVal<String, Long> minuteVolume = getMinuteVolumeOperator("MinuteVolume", dag, appWindowCountMinute); + ConsolidatorKeyVal<String,HighLow<Double>,Long,?,?,?> chartOperator = getChartOperator("Chart", dag); + + SimpleMovingAverage<String, Double> priceSMA = getPriceSimpleMovingAverageOperator("PriceSMA", dag, appWindowCountSMA); + DefaultPartitionCodec<String, Double> codec = new DefaultPartitionCodec<String, Double>(); + dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec); + dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec); + dag.addStream("price", tick.price, quoteOperator.in1, highlow.data, priceSMA.data); + dag.addStream("vol", tick.volume, dailyVolume.data, minuteVolume.data); + dag.addStream("time", tick.time, quoteOperator.in3); + dag.addStream("daily_vol", dailyVolume.sum, quoteOperator.in2); + + dag.addStream("quote_data", quoteOperator.out, getConsole("quoteConsole", dag, "QUOTE")); + + dag.addStream("high_low", highlow.range, chartOperator.in1); + dag.addStream("vol_1min", minuteVolume.sum, chartOperator.in2); + dag.addStream("chart_data", chartOperator.out, getConsole("chartConsole", dag, "CHART")); + + dag.addStream("sma_price", priceSMA.doubleSMA, getConsole("priceSMAConsole", dag, "Price SMA")); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceCSVInputOperator.java ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceCSVInputOperator.java b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceCSVInputOperator.java new file mode 100644 index 0000000..86b6263 --- /dev/null +++ b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/YahooFinanceCSVInputOperator.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.yahoofinance; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpStatus; +import org.apache.commons.httpclient.cookie.CookiePolicy; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.params.DefaultHttpParams; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.io.SimpleSinglePortInputOperator; + +import au.com.bytecode.opencsv.CSVReader; + +/** + * Grabs Yahoo Finance quotes data and emits HashMap, with key equals the format name (e.g. "s0") <p> + * + * @since 0.3.2 + */ +public class YahooFinanceCSVInputOperator extends SimpleSinglePortInputOperator<HashMap<String, Object>> implements Runnable +{ + private static final Logger logger = LoggerFactory.getLogger(YahooFinanceCSVInputOperator.class); + /** + * Timeout interval for reading from server. 0 or negative indicates no timeout. + */ + private int readIntervalMillis = 500; + + /** + * The URL of the web service resource for the POST request. + */ + private String url; + private transient HttpClient client; + private transient GetMethod method; + + private ArrayList<String> symbolList = new ArrayList<String>(); + private ArrayList<String> parameterList = new ArrayList<String>(); + + public void addSymbol(String symbol) + { + symbolList.add(symbol); + } + + public void addFormat(String format) + { + parameterList.add(format); + } + + public ArrayList<String> getSymbolList() + { + return symbolList; + } + + public ArrayList<String> getParameterList() + { + return parameterList; + } + + public int getReadIntervalMillis() + { + return readIntervalMillis; + } + + public void setReadIntervalMillis(int readIntervalMillis) + { + this.readIntervalMillis = readIntervalMillis; + } + + /** + * Prepare URL from symbols and parameters. + * URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=GOOG,FB,YHOO&f=sl1vt1&e=.csv + * @return + */ + private String prepareURL() + { + String str = "http://download.finance.yahoo.com/d/quotes.csv?"; + + str += "s="; + for (int i = 0; i < symbolList.size(); i++) { + if (i != 0) { + str += ","; + } + str += symbolList.get(i); + } + str += "&f="; + for (String format: parameterList) { + str += format; + } + str += "&e=.csv"; + return str; + } + + @Override + public void setup(OperatorContext context) + { + url = prepareURL(); + client = new HttpClient(); + method = new GetMethod(url); + DefaultHttpParams.getDefaultParams().setParameter("http.protocol.cookie-policy", CookiePolicy.BROWSER_COMPATIBILITY); + } + + @Override + public void run() + { + while (true) { + try { + int statusCode = client.executeMethod(method); + if (statusCode != HttpStatus.SC_OK) { + logger.error("Method failed: " + method.getStatusLine()); + } else { + InputStream istream; + istream = method.getResponseBodyAsStream(); + // Process response + InputStreamReader isr = new InputStreamReader(istream); + CSVReader reader = new CSVReader(isr); + List<String[]> myEntries; + myEntries = reader.readAll(); + for (String[] stringArr: myEntries) { + HashMap<String,Object> hm = new HashMap<String,Object>(); + for (int i = 0; i < parameterList.size(); i++) { + hm.put(parameterList.get(i), stringArr[i]); + } + outputPort.emit(hm); // send out one symbol at a time + } + } + Thread.sleep(readIntervalMillis); + } catch (InterruptedException ex) { + logger.debug(ex.toString()); + } catch (IOException ex) { + logger.debug(ex.toString()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/doc-files/Application.gif ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/doc-files/Application.gif b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/doc-files/Application.gif new file mode 100644 index 0000000..8fc8331 Binary files /dev/null and b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/doc-files/Application.gif differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/package-info.java ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/package-info.java b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/package-info.java new file mode 100644 index 0000000..4a7bb10 --- /dev/null +++ b/examples/yahoofinance/src/main/java/org/apache/apex/examples/yahoofinance/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Yahoo Finance demonstration applications. + */ +package org.apache.apex.examples.yahoofinance; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/main/resources/META-INF/properties.xml b/examples/yahoofinance/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..bb5f201 --- /dev/null +++ b/examples/yahoofinance/src/main/resources/META-INF/properties.xml @@ -0,0 +1,81 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<!--Configuration for chart example --> +<configuration> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> + <property> + <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name> + <value>1000</value> + </property> + <property> + <name>dt.application.YahooFinanceExampleWithChart.operator.AverageChart.xAxisLabel</name> + <value>TIME</value> + </property> + <property> + <name>dt.application.YahooFinanceExampleWithChart.operator.StockTickInput.tickers</name> + <value>IBM,GOOG,AAPL,YHOO</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> + <value>-Xmx128M</value> + </property> + <property> + <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.application.YahooFinanceExample.operator.StockTickInput.tickers</name> + <value>IBM,GOOG,AAPL,YHOO</value> + </property> + <property> + <name>dt.application.YahooFinanceWithoutChartExample.operator.StockTickInput.tickers</name> + <value>IBM,GOOG,AAPL,YHOO</value> + </property> + <property> + <name>dt.application.YahooFinanceExampleWithChart.operator.AverageChart.yAxisLabel</name> + <value>PRICE</value> + </property> + <property> + <name>dt.application.YahooFinanceExampleWithChart.operator.CandleStickChart.xAxisLabel</name> + <value>TIME</value> + </property> + <property> + <name>dt.application.YahooFinanceExampleWithChart.operator.CandleStickChart.yAxisLabel</name> + <value>PRICE</value> + </property> + <property> + <name>dt.application.YahooFinanceExampleWithChart.operator.AverageChart.attr.APPLICATION_WINDOW_COUNT + </name> + <value>5</value> + </property> + <property> + <name>dt.application.YahooFinanceExampleWithChart.operator.AverageChart.attr.APPLICATION_WINDOW_COUNT + </name> + <value>5</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationTest.java b/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationTest.java new file mode 100644 index 0000000..a8c3cb9 --- /dev/null +++ b/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.yahoofinance; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +/** + * Run Yahoo Finance application example. + * + */ +public class ApplicationTest +{ + + /** + * This will run for ever. + * + * @throws Exception + */ + @Test + public void testApplication() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + new YahooFinanceApplication().populateDAG(lma.getDAG(), new Configuration(false)); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQLTest.java ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQLTest.java b/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQLTest.java new file mode 100644 index 0000000..bd2db3d --- /dev/null +++ b/examples/yahoofinance/src/test/java/org/apache/apex/examples/yahoofinance/ApplicationWithDerbySQLTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.yahoofinance; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +/** + * + */ +public class ApplicationWithDerbySQLTest +{ + private final transient Logger LOG = LoggerFactory.getLogger(ApplicationWithDerbySQLTest.class); + public ApplicationWithDerbySQLTest() + { + } + + @Test + public void testSomeMethod() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + new ApplicationWithDerbySQL().populateDAG(lma.getDAG(), new Configuration(false)); + LocalMode.Controller lc = lma.getController(); + + long start = System.currentTimeMillis(); + lc.run(); + long end = System.currentTimeMillis(); + long time = end - start; + LOG.debug("Test used " + time + " ms"); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/test/resources/alert_create.json ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/test/resources/alert_create.json b/examples/yahoofinance/src/test/resources/alert_create.json new file mode 100644 index 0000000..3059145 --- /dev/null +++ b/examples/yahoofinance/src/test/resources/alert_create.json @@ -0,0 +1,24 @@ +{ + "name":"alertName", + "streamName":"yahooFinance.outputPort", + "filter": { + "class": "com.datatorrent.lib.util.JavaScriptFilterOperator", + "properties": { + "setupScript":"function f() { return s0 == \"AAPL\" && l1 > 508 }", + "functionName":"f" + } + }, + "escalation": { + "class": "com.datatorrent.lib.util.AlertEscalationOperator", + "properties": { + "alertInterval":"5000", + "timeout":"10000" + } + }, + "actions": [{ + "outputPort":"alert", + "inputPort":"input", + "class":"com.datatorrent.lib.io.ConsoleOutputOperator" + }], + "saveAs":"firstAlert" +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/yahoofinance/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/yahoofinance/src/test/resources/log4j.properties b/examples/yahoofinance/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/yahoofinance/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/library/src/test/resources/SocketInputOperatorTest.txt ---------------------------------------------------------------------- diff --git a/library/src/test/resources/SocketInputOperatorTest.txt b/library/src/test/resources/SocketInputOperatorTest.txt index 319c661..914cd5d 100644 --- a/library/src/test/resources/SocketInputOperatorTest.txt +++ b/library/src/test/resources/SocketInputOperatorTest.txt @@ -1 +1 @@ -Malhar repository contains open source operator and codec library that can be used with the Apache Apex (incubating) platform to build Realtime streaming applications. In addition to the library there are benchmark, contrib, demos, webdemos and samples folders available. Demos contain demo applications built using the library operators. Webdemos contain webpages for the demos. Benchmark contains performance testing applications. Contrib contains additional operators that interface with third party softwares. Samples contain some sample code that shows how to use the library operators. +Malhar repository contains open source operator and codec library that can be used with the Apache Apex (incubating) platform to build Realtime streaming applications. In addition to the library there are benchmark, contrib, examples, webdemos and samples folders available. Examples contain example applications built using the library operators. Webdemos contain webpages for the examples. Benchmark contains performance testing applications. Contrib contains additional operators that interface with third party softwares. Samples contain some sample code that shows how to use the library operators. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 79de8f4..9cc611a 100644 --- a/pom.xml +++ b/pom.xml @@ -210,7 +210,7 @@ <modules> <module>library</module> <module>contrib</module> - <module>demos</module> + <module>examples</module> </modules> <dependencies>
