Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 41caa952e -> c31c5eab3
MLHR-1892 Convert comments to javadoc format Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c9fd68d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c9fd68d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c9fd68d7 Branch: refs/heads/devel-3 Commit: c9fd68d77b761ad93dfc43674269e5142c08c7df Parents: 217f8db Author: Munagala V. Ramanath <[email protected]> Authored: Mon Nov 30 13:22:06 2015 -0800 Committer: Munagala V. Ramanath <[email protected]> Committed: Wed Dec 2 21:39:53 2015 -0800 ---------------------------------------------------------------------- .../wordcount/ApplicationWithQuerySupport.java | 20 ++++ .../demos/wordcount/FileWordCount.java | 112 ++++++++++++++----- .../datatorrent/demos/wordcount/LineReader.java | 28 ++++- .../com/datatorrent/demos/wordcount/WCPair.java | 22 +++- .../demos/wordcount/WindowWordCount.java | 24 ++-- .../demos/wordcount/WordCountWriter.java | 24 +++- .../datatorrent/demos/wordcount/WordReader.java | 24 +++- 7 files changed, 205 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java index 92d0b3d..5492b5e 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java @@ -38,13 +38,33 @@ import com.datatorrent.lib.io.ConsoleOutputOperator; import org.apache.hadoop.conf.Configuration; +/** + * Simple demo that counts word frequencies from any file dropped into a + * monitored directory. It outputs the top N word-frequency pairs for each file + * as well globally across all files. + * <p> + * Each input file generates a corresponding output file in the output directory + * containing the top N pairs for that file. The output is also written + * to an internal store to support visualization in the UI via queries. + * <p> + * @since 3.2.0 + */ @ApplicationAnnotation(name="TopNWordsWithQueries") public class ApplicationWithQuerySupport implements StreamingApplication { private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class); + /** + * name of schema file + */ public static final String SNAPSHOT_SCHEMA = "WordDataSchema.json"; + /** + * Populates the DAG with operators and connecting streams + * + * @param dag The directed acyclic graph of operators to populate + * @param conf The directed acyclic graph of operators to populate + */ @Override public void populateDAG(DAG dag, Configuration conf) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java index ee9439e..a9d41f2 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java @@ -25,8 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -//import org.apache.commons.lang.mutable.MutableInt; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,33 +60,58 @@ public class FileWordCount extends BaseOperator private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class); private static final String GLOBAL = "global"; - // If topN > 0, only data for the topN most frequent words is output; if topN == 0, the - // entire frequency map is output - // + /** + * If {@literal topN > 0}, only data for the topN most frequent words is output; if topN == 0, the + * entire frequency map is output + */ protected int topN; - // set to true when we get an EOF control tuple + /** + * set to true when an EOF control tuple for the current input file is received; reset to false + * when the corresponding output file has been written. + */ protected boolean eof = false; - // last component of path (i.e. only file name) - // incoming value from control tuple + /** + * last component of path (just the file name) + * incoming value from control tuple + */ protected String fileName; - // wordMapFile : {word => frequency} map, current file, all words - // wordMapGlobal : {word => frequency} map, global, all words - // + /** + * {@literal (word => frequency)} map: current file, all words + */ protected Map<String, WCPair> wordMapFile = new HashMap<>(); + + /** + * {@literal (word => frequency)} map: global, all words + */ protected Map<String, WCPair> wordMapGlobal = new HashMap<>(); - // resultPerFile : singleton list [TopNMap] with per file data; sent on outputPerFile - // resultGlobal : singleton list [wordFreqMap] with per file data; sent on outputGlobal - // - protected transient List<Map<String, Object>> resultPerFile, resultGlobal; + /** + * singleton list with per file data; sent on {@code outputPerFile} + */ + protected transient List<Map<String, Object>> resultPerFile; - // singleton map of fileName to sorted list of (word, frequency) pairs + /** + * singleton list with global data; sent on {@code outputGlobal} + */ + protected transient List<Map<String, Object>> resultGlobal; + + /** + * singleton map of {@code fileName} to sorted list of (word, frequency) pairs + */ protected transient Map<String, Object> resultFileFinal; + + /** + * final list of (word, frequency) pairs written to output file + */ protected transient List<WCPair> fileFinalList; + /** + * input port on which per-window {@literal (word => frequency)} map is received; the map + * is merged into {@code wordMapFile} and {@code wordMapGlobal}. + */ public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>() { @Override @@ -123,6 +146,9 @@ public class FileWordCount extends BaseOperator } }; + /** + * control port on which the current file name is received to indicate EOF + */ @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<String> control = new DefaultInputPort<String>() { @@ -139,30 +165,46 @@ public class FileWordCount extends BaseOperator } }; - // outputPerFile -- tuple is TopNMap for current file - // outputGlobal -- tuple is TopNMap globally - // + /** + * output port for current file output + */ public final transient DefaultOutputPort<List<Map<String, Object>>> outputPerFile = new DefaultOutputPort<>(); + /** + * output port for global output + */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<List<Map<String, Object>>> outputGlobal = new DefaultOutputPort<>(); - // fileOutput -- tuple is singleton map {<fileName> => TopNMap} where TopNMap is the final - // top N for current file; emitted on EOF - // + /** + * tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} is the final + * top N pairs for current file and will be written to the output file; emitted in the + * {@code endWindow()} call after an EOF + */ public final transient DefaultOutputPort<Map<String, Object>> fileOutput = new DefaultOutputPort<>(); + /** + * get the number of top (word, frequency) pairs that will be output + */ public int getTopN() { return topN; } + /** + * set the number of top (word, frequency) pairs that will be output + * @param n The new number + */ public void setTopN(int n) { topN = n; } + /** + * {@inheritDoc} + * Initialize various map and list fields + */ @Override public void setup(OperatorContext context) { @@ -179,6 +221,14 @@ public class FileWordCount extends BaseOperator fileFinalList = new ArrayList<>(); } + /** + * {@inheritDoc} + * This is where we do most of the work: + * 1. Sort global map and emit top N pairs + * 2. Sort current file map and emit top N pairs + * 3. If we've seen EOF, emit top N pairs on port connected to file writer and clear all per-file + * data structures. + */ @Override public void endWindow() { @@ -233,10 +283,11 @@ public class FileWordCount extends BaseOperator } } - // get topN frequencies from map, convert each pair to a singleton map and append to result - // This map is suitable input to AppDataSnapshotServer - // MUST have map.size() > 0 here - // + /** + * get topN frequencies from map, convert each pair to a singleton map and append to result + * This map is suitable input to AppDataSnapshotServer + * MUST have {@code map.size() > 0} here + */ private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result) { final ArrayList<WCPair> list = new ArrayList<>(map.values()); @@ -267,10 +318,11 @@ public class FileWordCount extends BaseOperator list.clear(); } - // populate fileFinalList with topN frequencies from argument - // This list is suitable input to WordCountWriter which writes it to a file - // MUST have map.size() > 0 here - // + /** + * populate fileFinalList with topN frequencies from argument + * This list is suitable input to WordCountWriter which writes it to a file + * MUST have {@code map.size() > 0} here + */ private void getTopNList(final Map<String, WCPair> map) { fileFinalList.clear(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java index 135005d..4dde0c5 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java @@ -32,15 +32,24 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.lib.io.fs.AbstractFileInputOperator; -// reads lines from input file and returns them; if end-of-file is reached, a control tuple -// is emitted on the control port -// +/** + * Reads lines from input file and returns them. If EOF is reached, a control tuple + * is emitted on the control port + * + * @since 3.2.0 + */ public class LineReader extends AbstractFileInputOperator<String> { private static final Logger LOG = LoggerFactory.getLogger(LineReader.class); + /** + * output port on which lines from current file name are emitted + */ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + /** + * control port on which the current file name is emitted to indicate EOF + */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<String> control = new DefaultOutputPort<>(); @@ -48,6 +57,10 @@ public class LineReader extends AbstractFileInputOperator<String> private Path path; + /** + * file open callback; wrap the file input stream in a buffered reader for reading lines + * @param curPath The path to the file just opened + */ @Override protected InputStream openFile(Path curPath) throws IOException { @@ -58,6 +71,10 @@ public class LineReader extends AbstractFileInputOperator<String> return is; } + /** + * file close callback; close buffered reader + * @param is File input stream that will imminently be closed + */ @Override protected void closeFile(InputStream is) throws IOException { @@ -67,7 +84,10 @@ public class LineReader extends AbstractFileInputOperator<String> path = null; } - // return empty string + /** + * {@inheritDoc} + * if we hit EOF, emit file name on control port + */ @Override protected String readEntity() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java index 5aea536..867ed13 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java @@ -18,13 +18,33 @@ */ package com.datatorrent.demos.wordcount; -// a single (word, frequency) pair +/** + * a single (word, frequency) pair + * + * @since 3.2.0 + */ public class WCPair { + + /** + * The word + */ public String word; + + /** + * The frequency + */ public int freq; + /** + * Default constructor + */ public WCPair() {} + /** + * Create new object with given values + * @param w The word + * @param f The frequency + */ public WCPair(String w, int f) { word = w; freq = f; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java index 239d55a..a448366 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java @@ -30,16 +30,22 @@ import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; -// Computes word frequency counts per window and emits them at each endWindow. The output is a -// list of pairs (word, frequency). -// +/** + * Computes word frequency counts per window and emits them at each endWindow. The output is a + * list of (word, frequency) pairs + * + * @since 3.2.0 + */ public class WindowWordCount extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(WindowWordCount.class); - // wordMap : word => frequency + /** {@literal (word => frequency)} map for current window */ protected Map<String, WCPair> wordMap = new HashMap<>(); + /** + * input port on which words are received + */ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() { @Override @@ -59,11 +65,15 @@ public class WindowWordCount extends BaseOperator } }; - // output port which emits the list of word frequencies for current window - // fileName => list of (word, freq) pairs - // + /** + * output port which emits the list of word frequencies for current window + */ public final transient DefaultOutputPort<List<WCPair>> output = new DefaultOutputPort<>(); + /** + * {@inheritDoc} + * If we've seen some words in this window, emit the map and clear it for next window + */ @Override public void endWindow() { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java index ee61256..2b9562d 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java @@ -28,8 +28,11 @@ import org.slf4j.LoggerFactory; import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; -// write top N words and their frequencies to a file -// +/** + * write top N words and their frequencies to a file + * + * @since 3.2.0 + */ public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Object>> { private static final Logger LOG = LoggerFactory.getLogger(WordCountWriter.class); @@ -39,6 +42,10 @@ public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Obje private String fileName; // current file name private transient final StringBuilder sb = new StringBuilder(); + /** + * {@inheritDoc} + * Invoke requestFinalize() to create the output file with the desired name without decorations. + */ @Override public void endWindow() { @@ -48,9 +55,11 @@ public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Obje super.endWindow(); } - // input is a singleton list [M] where M is a singleton map {fileName => L} where L is a - // list of pairs: (word, frequency) - // + /** + * Extracts file name from argument + * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs} + * @return the file name to write the tuple to + */ @Override protected String getFileName(Map<String, Object> tuple) { @@ -62,6 +71,11 @@ public class WordCountWriter extends AbstractFileOutputOperator<Map<String, Obje return fileName; } + /** + * Extracts output file content from argument + * @param tuple Singleton map {@literal (fileName => L) where L is a list of (word, frequency) pairs} + * @return input tuple converted to an array of bytes + */ @Override protected byte[] getBytesForTuple(Map<String, Object> tuple) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java index 16ee246..ed1f7bb 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java @@ -25,17 +25,25 @@ import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; -// extracts words from input line +/** + * extracts words from input line + */ public class WordReader extends BaseOperator { // default pattern for word-separators private static final Pattern nonWordDefault = Pattern.compile("[\\p{Punct}\\s]+"); - private String nonWordStr; // configurable regex + private String nonWordStr; // configurable regex private transient Pattern nonWord; // compiled regex + /** + * output port on which words from the current file are emitted + */ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + /** + * input port on which lines from the current file are received + */ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() { @@ -51,14 +59,26 @@ public class WordReader extends BaseOperator } }; + /** + * Returns the regular expression that matches strings between words + * @return Regular expression for strings that separate words + */ public String getNonWordStr() { return nonWordStr; } + /** + * Sets the regular expression that matches strings between words + * @param regex New regular expression for strings that separate words + */ public void setNonWordStr(String regex) { nonWordStr = regex; } + /** + * {@inheritDoc} + * Set nonWord to the default pattern if necessary + */ @Override public void setup(OperatorContext context) {
