Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 41caa952e -> c31c5eab3


MLHR-1892 Convert comments to javadoc format


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c9fd68d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c9fd68d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c9fd68d7

Branch: refs/heads/devel-3
Commit: c9fd68d77b761ad93dfc43674269e5142c08c7df
Parents: 217f8db
Author: Munagala V. Ramanath <[email protected]>
Authored: Mon Nov 30 13:22:06 2015 -0800
Committer: Munagala V. Ramanath <[email protected]>
Committed: Wed Dec 2 21:39:53 2015 -0800

----------------------------------------------------------------------
 .../wordcount/ApplicationWithQuerySupport.java  |  20 ++++
 .../demos/wordcount/FileWordCount.java          | 112 ++++++++++++++-----
 .../datatorrent/demos/wordcount/LineReader.java |  28 ++++-
 .../com/datatorrent/demos/wordcount/WCPair.java |  22 +++-
 .../demos/wordcount/WindowWordCount.java        |  24 ++--
 .../demos/wordcount/WordCountWriter.java        |  24 +++-
 .../datatorrent/demos/wordcount/WordReader.java |  24 +++-
 7 files changed, 205 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
index 92d0b3d..5492b5e 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
@@ -38,13 +38,33 @@ import com.datatorrent.lib.io.ConsoleOutputOperator;
 
 import org.apache.hadoop.conf.Configuration;
 
+/**
+ * Simple demo that counts word frequencies from any file dropped into a
+ * monitored directory. It outputs the top N word-frequency pairs for each file
+ * as well globally across all files.
+ * <p>
+ * Each input file generates a corresponding output file in the output 
directory
+ * containing the top N pairs for that file. The output is also written
+ * to an internal store to support visualization in the UI via queries.
+ * <p>
+ * @since 3.2.0
+ */
 @ApplicationAnnotation(name="TopNWordsWithQueries")
 public class ApplicationWithQuerySupport implements StreamingApplication
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationWithQuerySupport.class);
 
+  /**
+   * name of schema file
+   */
   public static final String SNAPSHOT_SCHEMA = "WordDataSchema.json";
 
+  /**
+   * Populates the DAG with operators and connecting streams
+   *
+   * @param dag The directed acyclic graph of operators to populate
+   * @param conf The directed acyclic graph of operators to populate
+   */
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
index ee9439e..a9d41f2 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
@@ -25,8 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-//import org.apache.commons.lang.mutable.MutableInt;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,33 +60,58 @@ public class FileWordCount extends BaseOperator
   private static final Logger LOG = 
LoggerFactory.getLogger(FileWordCount.class);
   private static final String GLOBAL = "global";
 
-  // If topN > 0, only data for the topN most frequent words is output; if 
topN == 0, the
-  // entire frequency map is output
-  //
+  /**
+   * If {@literal topN > 0}, only data for the topN most frequent words is 
output; if topN == 0, the
+   * entire frequency map is output
+   */
   protected int topN;
 
-  // set to true when we get an EOF control tuple
+  /**
+   * set to true when an EOF control tuple for the current input file is 
received; reset to false
+   * when the corresponding output file has been written.
+   */
   protected boolean eof = false;
 
-  // last component of path (i.e. only file name)
-  // incoming value from control tuple
+  /**
+   * last component of path (just the file name)
+   * incoming value from control tuple
+   */
   protected String fileName;
 
-  // wordMapFile   : {word => frequency} map, current file, all words
-  // wordMapGlobal : {word => frequency} map, global, all words
-  //
+  /**
+   * {@literal (word => frequency)} map: current file, all words
+   */
   protected Map<String, WCPair> wordMapFile = new HashMap<>();
+
+  /**
+   * {@literal (word => frequency)} map: global, all words
+   */
   protected Map<String, WCPair> wordMapGlobal = new HashMap<>();
 
-  // resultPerFile : singleton list [TopNMap] with per file data; sent on 
outputPerFile
-  // resultGlobal : singleton list [wordFreqMap] with per file data; sent on 
outputGlobal
-  //
-  protected transient List<Map<String, Object>> resultPerFile, resultGlobal;
+  /**
+   * singleton list with per file data; sent on {@code outputPerFile}
+   */
+  protected transient List<Map<String, Object>> resultPerFile;
 
-  // singleton map of fileName to sorted list of (word, frequency) pairs
+  /**
+   * singleton list with global data; sent on {@code outputGlobal}
+   */
+  protected transient List<Map<String, Object>> resultGlobal;
+
+  /**
+   * singleton map of {@code fileName} to sorted list of (word, frequency) 
pairs
+   */
   protected transient Map<String, Object> resultFileFinal;
+
+  /**
+   * final list of (word, frequency) pairs written to output file
+   */
   protected transient List<WCPair> fileFinalList;
 
+  /**
+   * input port on which per-window {@literal (word => frequency)} map is 
received; the map
+   * is merged into {@code wordMapFile} and {@code wordMapGlobal}.
+   */
   public final transient DefaultInputPort<List<WCPair>> input = new 
DefaultInputPort<List<WCPair>>()
   {
     @Override
@@ -123,6 +146,9 @@ public class FileWordCount extends BaseOperator
     }
   };
 
+  /**
+   * control port on which the current file name is received to indicate EOF
+   */
   @InputPortFieldAnnotation(optional = true)
   public final transient DefaultInputPort<String> control = new 
DefaultInputPort<String>()
   {
@@ -139,30 +165,46 @@ public class FileWordCount extends BaseOperator
     }
   };
 
-  // outputPerFile -- tuple is TopNMap for current file
-  // outputGlobal --  tuple is TopNMap globally
-  //
+  /**
+   * output port for current file output
+   */
   public final transient DefaultOutputPort<List<Map<String, Object>>>
     outputPerFile = new DefaultOutputPort<>();
 
+  /**
+   * output port for global output
+   */
   @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<List<Map<String, Object>>>
     outputGlobal = new DefaultOutputPort<>();
 
-  // fileOutput -- tuple is singleton map {<fileName> => TopNMap} where 
TopNMap is the final
-  //               top N for current file; emitted on EOF
-  //
+  /**
+   * tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} 
is the final
+   * top N pairs for current file and will be written to the output file; 
emitted in the
+   * {@code endWindow()} call after an EOF
+   */
   public final transient DefaultOutputPort<Map<String, Object>>
     fileOutput = new DefaultOutputPort<>();
 
+  /**
+   * get the number of top (word, frequency) pairs that will be output
+   */
   public int getTopN() {
     return topN;
   }
 
+  /**
+   * set the number of top (word, frequency) pairs that will be output
+   * @param n The new number
+   */
   public void setTopN(int n) {
     topN = n;
   }
 
+  /**
+   * {@inheritDoc}
+   * Initialize various map and list fields
+   */
   @Override
   public void setup(OperatorContext context)
   {
@@ -179,6 +221,14 @@ public class FileWordCount extends BaseOperator
     fileFinalList = new ArrayList<>();
   }
 
+  /**
+   * {@inheritDoc}
+   * This is where we do most of the work:
+   * 1. Sort global map and emit top N pairs
+   * 2. Sort current file map and emit top N pairs
+   * 3. If we've seen EOF, emit top N pairs on port connected to file writer 
and clear all per-file
+   *    data structures.
+   */
   @Override
   public void endWindow()
   {
@@ -233,10 +283,11 @@ public class FileWordCount extends BaseOperator
     }
   }
 
-  // get topN frequencies from map, convert each pair to a singleton map and 
append to result
-  // This map is suitable input to AppDataSnapshotServer
-  // MUST have map.size() > 0 here
-  //
+  /**
+   * get topN frequencies from map, convert each pair to a singleton map and 
append to result
+   * This map is suitable input to AppDataSnapshotServer
+   * MUST have {@code map.size() > 0} here
+   */
   private void getTopNMap(final Map<String, WCPair> map, List<Map<String, 
Object>> result)
   {
     final ArrayList<WCPair> list = new ArrayList<>(map.values());
@@ -267,10 +318,11 @@ public class FileWordCount extends BaseOperator
     list.clear();
   }
 
-  // populate fileFinalList with topN frequencies from argument
-  // This list is suitable input to WordCountWriter which writes it to a file
-  // MUST have map.size() > 0 here
-  //
+  /**
+   * populate fileFinalList with topN frequencies from argument
+   * This list is suitable input to WordCountWriter which writes it to a file
+   * MUST have {@code map.size() > 0} here
+   */
   private void getTopNList(final Map<String, WCPair> map)
   {
     fileFinalList.clear();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
index 135005d..4dde0c5 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
@@ -32,15 +32,24 @@ import 
com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
 
-// reads lines from input file and returns them; if end-of-file is reached, a 
control tuple
-// is emitted on the control port
-//
+/**
+ * Reads lines from input file and returns them. If EOF is reached, a control 
tuple
+ * is emitted on the control port
+ *
+ * @since 3.2.0
+ */
 public class LineReader extends AbstractFileInputOperator<String>
 {
   private static final Logger LOG = LoggerFactory.getLogger(LineReader.class);
 
+  /**
+   * output port on which lines from current file name are emitted
+   */
   public final transient DefaultOutputPort<String> output  = new 
DefaultOutputPort<>();
 
+  /**
+   * control port on which the current file name is emitted to indicate EOF
+   */
   @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<String> control = new 
DefaultOutputPort<>();
 
@@ -48,6 +57,10 @@ public class LineReader extends 
AbstractFileInputOperator<String>
 
   private Path path;
 
+  /**
+   * file open callback; wrap the file input stream in a buffered reader for 
reading lines
+   * @param curPath The path to the file just opened
+   */
   @Override
   protected InputStream openFile(Path curPath) throws IOException
   {
@@ -58,6 +71,10 @@ public class LineReader extends 
AbstractFileInputOperator<String>
     return is;
   }
 
+  /**
+   * file close callback; close buffered reader
+   * @param is File input stream that will imminently be closed
+   */
   @Override
   protected void closeFile(InputStream is) throws IOException
   {
@@ -67,7 +84,10 @@ public class LineReader extends 
AbstractFileInputOperator<String>
     path = null;
   }
 
-  // return empty string 
+  /**
+   * {@inheritDoc}
+   * if we hit EOF, emit file name on control port
+   */
   @Override
   protected String readEntity() throws IOException
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
index 5aea536..867ed13 100644
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
@@ -18,13 +18,33 @@
  */
 package com.datatorrent.demos.wordcount;
 
-// a single (word, frequency) pair
+/**
+ * a single (word, frequency) pair
+ *
+ * @since 3.2.0
+ */
 public class WCPair {
+
+  /**
+   * The word
+   */
   public String word;
+
+  /**
+   * The frequency
+   */
   public int freq;
 
+  /**
+   * Default constructor
+   */
   public WCPair() {}
 
+  /**
+   * Create new object with given values
+   * @param w The word
+   * @param f The frequency
+   */
   public WCPair(String w, int f) {
     word = w;
     freq = f;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
index 239d55a..a448366 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
@@ -30,16 +30,22 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.common.util.BaseOperator;
 
-// Computes word frequency counts per window and emits them at each endWindow. 
The output is a
-// list of pairs (word, frequency).
-//
+/**
+ * Computes word frequency counts per window and emits them at each endWindow. 
The output is a
+ * list of (word, frequency) pairs
+ *
+ * @since 3.2.0
+ */
 public class WindowWordCount extends BaseOperator
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(WindowWordCount.class);
 
-  // wordMap : word => frequency
+  /** {@literal (word => frequency)} map for current window */
   protected Map<String, WCPair> wordMap = new HashMap<>();
 
+  /**
+   * input port on which words are received
+   */
   public final transient DefaultInputPort<String> input = new 
DefaultInputPort<String>()
   {
     @Override
@@ -59,11 +65,15 @@ public class WindowWordCount extends BaseOperator
     }
   };
 
-  // output port which emits the list of word frequencies for current window
-  // fileName => list of (word, freq) pairs
-  //
+  /**
+   * output port which emits the list of word frequencies for current window
+   */
   public final transient DefaultOutputPort<List<WCPair>> output = new 
DefaultOutputPort<>();
 
+  /**
+   * {@inheritDoc}
+   * If we've seen some words in this window, emit the map and clear it for 
next window
+   */
   @Override
   public void endWindow()
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
index ee61256..2b9562d 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
@@ -28,8 +28,11 @@ import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
 
-// write top N words and their frequencies to a file
-//
+/**
+ * write top N words and their frequencies to a file
+ *
+ * @since 3.2.0
+ */
 public class WordCountWriter extends AbstractFileOutputOperator<Map<String, 
Object>>
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(WordCountWriter.class);
@@ -39,6 +42,10 @@ public class WordCountWriter extends 
AbstractFileOutputOperator<Map<String, Obje
   private String fileName;    // current file name
   private transient final StringBuilder sb = new StringBuilder();
 
+  /**
+   * {@inheritDoc}
+   * Invoke requestFinalize() to create the output file with the desired name 
without decorations.
+   */
   @Override
   public void endWindow()
   {
@@ -48,9 +55,11 @@ public class WordCountWriter extends 
AbstractFileOutputOperator<Map<String, Obje
     super.endWindow();
   }
 
-  // input is a singleton list [M] where M is a singleton map {fileName => L} 
where L is a
-  // list of pairs: (word, frequency)
-  //
+  /**
+   * Extracts file name from argument
+   * @param tuple Singleton map {@literal (fileName => L) where L is a list of 
(word, frequency) pairs}
+   * @return the file name to write the tuple to
+   */
   @Override
   protected String getFileName(Map<String, Object> tuple)
   {
@@ -62,6 +71,11 @@ public class WordCountWriter extends 
AbstractFileOutputOperator<Map<String, Obje
     return fileName;
   }
 
+  /**
+   * Extracts output file content from argument
+   * @param tuple Singleton map {@literal (fileName => L) where L is a list of 
(word, frequency) pairs}
+   * @return input tuple converted to an array of bytes
+   */
   @Override
   protected byte[] getBytesForTuple(Map<String, Object> tuple)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c9fd68d7/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
index 16ee246..ed1f7bb 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
@@ -25,17 +25,25 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.common.util.BaseOperator;
 
-// extracts words from input line
+/**
+ * extracts words from input line
+ */
 public class WordReader extends BaseOperator
 {
   // default pattern for word-separators
   private static final Pattern nonWordDefault = 
Pattern.compile("[\\p{Punct}\\s]+");
 
-  private String nonWordStr;    // configurable regex
+  private String nonWordStr;              // configurable regex
   private transient Pattern nonWord;      // compiled regex
 
+  /**
+   * output port on which words from the current file are emitted
+   */
   public final transient DefaultOutputPort<String> output = new 
DefaultOutputPort<>();
 
+  /**
+   * input port on which lines from the current file are received
+   */
   public final transient DefaultInputPort<String>
     input = new DefaultInputPort<String>() {
 
@@ -51,14 +59,26 @@ public class WordReader extends BaseOperator
     }
   };
 
+  /**
+   * Returns the regular expression that matches strings between words
+   * @return Regular expression for strings that separate words
+   */
   public String getNonWordStr() {
     return nonWordStr;
   }
 
+  /**
+   * Sets the regular expression that matches strings between words
+   * @param regex New regular expression for strings that separate words
+   */
   public void setNonWordStr(String regex) {
     nonWordStr = regex;
   }
 
+  /**
+   * {@inheritDoc}
+   * Set nonWord to the default pattern if necessary
+   */
   @Override
   public void setup(OperatorContext context)
   {

Reply via email to