Repository: apex-malhar
Updated Branches:
  refs/heads/master 0a1adff8a -> 7d9386d2a


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
index fd67bf6..7e5bb93 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java
@@ -20,23 +20,22 @@ package com.datatorrent.demos.wordcount;
 
 import java.net.URI;
 
-import org.apache.commons.lang.StringUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.api.StreamingApplication;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
 
 import com.datatorrent.lib.appdata.schemas.SchemaUtils;
 import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
 import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Simple demo that computes word frequencies from any file dropped into a
@@ -49,7 +48,7 @@ import org.apache.hadoop.conf.Configuration;
  * <p>
  * @since 3.2.0
  */
-@ApplicationAnnotation(name="TopNWordsWithQueries")
+@ApplicationAnnotation(name = "TopNWordsWithQueries")
 public class ApplicationWithQuerySupport implements StreamingApplication
 {
   private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationWithQuerySupport.class);
@@ -87,13 +86,13 @@ public class ApplicationWithQuerySupport implements 
StreamingApplication
 
     String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
 
-    if ( ! StringUtils.isEmpty(gatewayAddress)) {        // add query support
+    if (!StringUtils.isEmpty(gatewayAddress)) {        // add query support
       URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
 
       AppDataSnapshotServerMap snapshotServerFile
-        = dag.addOperator("snapshotServerFile", new 
AppDataSnapshotServerMap());
+          = dag.addOperator("snapshotServerFile", new 
AppDataSnapshotServerMap());
       AppDataSnapshotServerMap snapshotServerGlobal
-        = dag.addOperator("snapshotServerGlobal", new 
AppDataSnapshotServerMap());
+          = dag.addOperator("snapshotServerGlobal", new 
AppDataSnapshotServerMap());
 
       String snapshotServerJSON = 
SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
       snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON);
@@ -108,19 +107,17 @@ public class ApplicationWithQuerySupport implements 
StreamingApplication
       snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal);
 
       PubSubWebSocketAppDataResult wsResultFile
-        = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult());
+          = dag.addOperator("wsResultFile", new 
PubSubWebSocketAppDataResult());
       PubSubWebSocketAppDataResult wsResultGlobal
-        = dag.addOperator("wsResultGlobal", new 
PubSubWebSocketAppDataResult());
+          = dag.addOperator("wsResultGlobal", new 
PubSubWebSocketAppDataResult());
       wsResultFile.setUri(uri);
       wsResultGlobal.setUri(uri);
 
       Operator.InputPort<String> queryResultFilePort = wsResultFile.input;
       Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input;
 
-      dag.addStream("WordCountsFile", fileWordCount.outputPerFile,
-                    snapshotServerFile.input, console.input);
-      dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal,
-                    snapshotServerGlobal.input);
+      dag.addStream("WordCountsFile", fileWordCount.outputPerFile, 
snapshotServerFile.input, console.input);
+      dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, 
snapshotServerGlobal.input);
 
       dag.addStream("ResultFile", snapshotServerFile.queryResult, 
queryResultFilePort);
       dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, 
queryResultGlobalPort);
@@ -129,7 +126,7 @@ public class ApplicationWithQuerySupport implements 
StreamingApplication
       dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input);
     }
 
-    System.out.println("done with populateDAG, isDebugEnabled = " + 
LOG.isDebugEnabled());
+    LOG.info("done with populateDAG, isDebugEnabled = " + 
LOG.isDebugEnabled());
     LOG.info("Returning from populateDAG");
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
index 8eb004a..e8a91b2 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/FileWordCount.java
@@ -28,11 +28,11 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.common.util.BaseOperator;
 
 /**
@@ -169,14 +169,14 @@ public class FileWordCount extends BaseOperator
    * Output port for current file output
    */
   public final transient DefaultOutputPort<List<Map<String, Object>>>
-    outputPerFile = new DefaultOutputPort<>();
+      outputPerFile = new DefaultOutputPort<>();
 
   /**
    * Output port for global output
    */
   @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<List<Map<String, Object>>>
-    outputGlobal = new DefaultOutputPort<>();
+      outputGlobal = new DefaultOutputPort<>();
 
   /**
    * Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} 
is the final
@@ -184,12 +184,13 @@ public class FileWordCount extends BaseOperator
    * {@code endWindow()} call after an EOF
    */
   public final transient DefaultOutputPort<Map<String, Object>>
-    fileOutput = new DefaultOutputPort<>();
+      fileOutput = new DefaultOutputPort<>();
 
   /**
    * Get the number of top (word, frequency) pairs that will be output
    */
-  public int getTopN() {
+  public int getTopN()
+  {
     return topN;
   }
 
@@ -197,7 +198,8 @@ public class FileWordCount extends BaseOperator
    * Set the number of top (word, frequency) pairs that will be output
    * @param n The new number
    */
-  public void setTopN(int n) {
+  public void setTopN(int n)
+  {
     topN = n;
   }
 
@@ -250,8 +252,7 @@ public class FileWordCount extends BaseOperator
       return;
     }
 
-    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = 
{}",
-             fileName, wordMapFile.size(), topN);
+    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = 
{}", fileName, wordMapFile.size(), topN);
 
     // get topN list for this file and, if we have EOF, emit to fileOutput port
 
@@ -293,13 +294,15 @@ public class FileWordCount extends BaseOperator
     final ArrayList<WCPair> list = new ArrayList<>(map.values());
 
     // sort entries in descending order of frequency
-    Collections.sort(list, new Comparator<WCPair>() {
-        @Override
-        public int compare(WCPair o1, WCPair o2) {
-          return (int)(o2.freq - o1.freq);
-        }
+    Collections.sort(list, new Comparator<WCPair>()
+    {
+      @Override
+      public int compare(WCPair o1, WCPair o2)
+      {
+        return (int)(o2.freq - o1.freq);
+      }
     });
-  
+
     if (topN > 0) {
       list.subList(topN, map.size()).clear();      // retain only the first 
topN entries
     }
@@ -329,13 +332,15 @@ public class FileWordCount extends BaseOperator
     fileFinalList.addAll(map.values());
 
     // sort entries in descending order of frequency
-    Collections.sort(fileFinalList, new Comparator<WCPair>() {
-        @Override
-        public int compare(WCPair o1, WCPair o2) {
-          return (int)(o2.freq - o1.freq);
-        }
+    Collections.sort(fileFinalList, new Comparator<WCPair>()
+    {
+      @Override
+      public int compare(WCPair o1, WCPair o2)
+      {
+        return (int)(o2.freq - o1.freq);
+      }
     });
-  
+
     if (topN > 0) {
       fileFinalList.subList(topN, map.size()).clear();      // retain only the 
first topN entries
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
index fe10d73..8a1a57b 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/LineReader.java
@@ -23,13 +23,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 
-import org.apache.hadoop.fs.Path;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import org.apache.hadoop.fs.Path;
+
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
 
 /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
index 1817f2b..bb67622 100644
--- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
+++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WCPair.java
@@ -23,8 +23,8 @@ package com.datatorrent.demos.wordcount;
  *
  * @since 3.2.0
  */
-public class WCPair {
-
+public class WCPair
+{
   /**
    * The word
    */
@@ -38,20 +38,25 @@ public class WCPair {
   /**
    * Default constructor
    */
-  public WCPair() {}
+  public WCPair()
+  {
+
+  }
 
   /**
    * Create new object with given values
    * @param w The word
    * @param f The frequency
    */
-  public WCPair(String w, int f) {
+  public WCPair(String w, int f)
+  {
     word = w;
     freq = f;
   }
-  
+
   @Override
-  public String toString() {
+  public String toString()
+  {
     return String.format("(%s, %d)", word, freq);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
index 999f32f..0edfd1e 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WindowWordCount.java
@@ -80,7 +80,9 @@ public class WindowWordCount extends BaseOperator
     LOG.info("WindowWordCount: endWindow");
 
     // got EOF; if no words found, do nothing
-    if (wordMap.isEmpty()) return;
+    if (wordMap.isEmpty()) {
+      return;
+    }
 
     // have some words; emit single map and reset for next file
     final ArrayList<WCPair> list = new ArrayList<>(wordMap.values());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
index 3ab20c6..3a88bab 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountInputOperator.java
@@ -18,84 +18,92 @@
  */
 package com.datatorrent.demos.wordcount;
 
-import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
-
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
+
 /**
  * <p>WordCountInputOperator class.</p>
  *
  * @since 0.3.2
  */
-public class WordCountInputOperator extends 
SimpleSinglePortInputOperator<String> implements Runnable {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(WordCountInputOperator.class);
-    protected long averageSleep = 300;
-    protected long sleepPlusMinus = 100;
-    protected String fileName = 
"com/datatorrent/demos/wordcount/samplefile.txt";
+public class WordCountInputOperator extends 
SimpleSinglePortInputOperator<String> implements Runnable
+{
 
-    public void setAverageSleep(long as) {
-        averageSleep = as;
-    }
+  private static final Logger logger = 
LoggerFactory.getLogger(WordCountInputOperator.class);
+  protected long averageSleep = 300;
+  protected long sleepPlusMinus = 100;
+  protected String fileName = "com/datatorrent/demos/wordcount/samplefile.txt";
 
-    public void setSleepPlusMinus(long spm) {
-        sleepPlusMinus = spm;
-    }
+  public void setAverageSleep(long as)
+  {
+    averageSleep = as;
+  }
 
-    public void setFileName(String fn) {
-        fileName = fn;
-    }
+  public void setSleepPlusMinus(long spm)
+  {
+    sleepPlusMinus = spm;
+  }
 
-    @Override
-    public void run() {
-        BufferedReader br = null;
-        DataInputStream in = null;
-        InputStream fstream = null;
+  public void setFileName(String fn)
+  {
+    fileName = fn;
+  }
 
-        while (true) {
-            try {
-                String line;
-                fstream = 
this.getClass().getClassLoader().getResourceAsStream(fileName);
+  @Override
+  public void run()
+  {
+    BufferedReader br = null;
+    DataInputStream in = null;
+    InputStream fstream = null;
 
-                in = new DataInputStream(fstream);
-                br = new BufferedReader(new InputStreamReader(in));
+    while (true) {
+      try {
+        String line;
+        fstream = 
this.getClass().getClassLoader().getResourceAsStream(fileName);
 
-                while ((line = br.readLine()) != null) {
-                    String[] words = 
line.trim().split("[\\p{Punct}\\s\\\"\\'“”]+");
-                    for (String word : words) {
-                        word = word.trim().toLowerCase();
-                        if (!word.isEmpty()) {
-                            //System.out.println("Sending "+word);
-                            outputPort.emit(word);
-                        }
-                    }
-                    try {
-                        Thread.sleep(averageSleep + (new Double(sleepPlusMinus 
* (Math.random() * 2 - 1))).longValue());
-                    } catch (InterruptedException ex) {
-                        // nothing
-                    }
-                }
+        in = new DataInputStream(fstream);
+        br = new BufferedReader(new InputStreamReader(in));
 
-            } catch (IOException ex) {
-                logger.debug(ex.toString());
-            } finally {
-                try {
-                    if (br != null) {
-                        br.close();
-                    }
-                    if (in != null) {
-                        in.close();
-                    }
-                    if (fstream != null) {
-                        fstream.close();
-                    }
-                } catch (IOException exc) {
-                    // nothing
-                }
+        while ((line = br.readLine()) != null) {
+          String[] words = line.trim().split("[\\p{Punct}\\s\\\"\\'“”]+");
+          for (String word : words) {
+            word = word.trim().toLowerCase();
+            if (!word.isEmpty()) {
+              outputPort.emit(word);
             }
+          }
+          try {
+            Thread.sleep(averageSleep + (new Double(sleepPlusMinus * 
(Math.random() * 2 - 1))).longValue());
+          } catch (InterruptedException ex) {
+            // nothing
+          }
+        }
+
+      } catch (IOException ex) {
+        logger.debug(ex.toString());
+      } finally {
+        try {
+          if (br != null) {
+            br.close();
+          }
+          if (in != null) {
+            in.close();
+          }
+          if (fstream != null) {
+            fstream.close();
+          }
+        } catch (IOException exc) {
+          // nothing
         }
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
index 62c41d3..30aab10 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordCountWriter.java
@@ -40,7 +40,7 @@ public class WordCountWriter extends 
AbstractFileOutputOperator<Map<String, Obje
   private static final String nl = System.lineSeparator();
 
   private String fileName;    // current file name
-  private transient final StringBuilder sb = new StringBuilder();
+  private final transient StringBuilder sb = new StringBuilder();
 
   /**
    * {@inheritDoc}
@@ -83,15 +83,17 @@ public class WordCountWriter extends 
AbstractFileOutputOperator<Map<String, Obje
 
     // get first and only pair; key is the fileName and is ignored here
     final Map.Entry<String, Object> entry = tuple.entrySet().iterator().next();
-    final List<WCPair> list = (List<WCPair>) entry.getValue();
+    final List<WCPair> list = (List<WCPair>)entry.getValue();
 
     if (sb.length() > 0) {        // clear buffer
       sb.delete(0, sb.length());
     }
 
     for ( WCPair pair : list ) {
-      sb.append(pair.word); sb.append(" : ");
-      sb.append(pair.freq); sb.append(nl);
+      sb.append(pair.word);
+      sb.append(" : ");
+      sb.append(pair.freq);
+      sb.append(nl);
     }
 
     final String data = sb.toString();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
index 56d9294..58c44b4 100644
--- 
a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
+++ 
b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/WordReader.java
@@ -46,8 +46,8 @@ public class WordReader extends BaseOperator
   /**
    * Input port on which lines from the current file are received
    */
-  public final transient DefaultInputPort<String>
-    input = new DefaultInputPort<String>() {
+  public final transient DefaultInputPort<String> input = new 
DefaultInputPort<String>()
+  {
 
     @Override
     public void process(String line)
@@ -55,7 +55,9 @@ public class WordReader extends BaseOperator
       // line; split it into words and emit them
       final String[] words = nonWord.split(line);
       for (String word : words) {
-        if (word.isEmpty()) continue;
+        if (word.isEmpty()) {
+          continue;
+        }
         output.emit(word);
       }
     }
@@ -65,7 +67,8 @@ public class WordReader extends BaseOperator
    * Returns the regular expression that matches strings between words
    * @return Regular expression for strings that separate words
    */
-  public String getNonWordStr() {
+  public String getNonWordStr()
+  {
     return nonWordStr;
   }
 
@@ -73,7 +76,8 @@ public class WordReader extends BaseOperator
    * Sets the regular expression that matches strings between words
    * @param regex New regular expression for strings that separate words
    */
-  public void setNonWordStr(String regex) {
+  public void setNonWordStr(String regex)
+  {
     nonWordStr = regex;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
 
b/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
index 6fc0dfa..1df0459 100644
--- 
a/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
+++ 
b/demos/wordcount/src/test/java/com/datatorrent/demos/wordcount/ApplicationTest.java
@@ -18,16 +18,18 @@
  */
 package com.datatorrent.demos.wordcount;
 
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.demos.wordcount.Application;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
 
 /**
  *
  */
 public class ApplicationTest
 {
+  private final transient Logger LOG = 
LoggerFactory.getLogger(ApplicationTest.class);
   public ApplicationTest()
   {
   }
@@ -36,14 +38,14 @@ public class ApplicationTest
   public void testSomeMethod() throws Exception
   {
     LocalMode lma = LocalMode.newInstance();
-    Configuration conf =new Configuration(false);
+    Configuration conf = new Configuration(false);
     conf.addResource("dt-site-wordcount.xml");
     lma.prepareDAG(new Application(), conf);
     LocalMode.Controller lc = lma.getController();
     long start = System.currentTimeMillis();
     lc.run(300000);
     long end = System.currentTimeMillis();
-    long time = end -start;
-    System.out.println("Test used "+time+" ms");
+    long time = end - start;
+    LOG.debug("Test used " + time + " ms");
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
----------------------------------------------------------------------
diff --git 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
index 1da5b6c..50b306d 100644
--- 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
+++ 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQL.java
@@ -18,28 +18,29 @@
  */
 package com.datatorrent.demos.yahoofinance;
 
-import com.datatorrent.api.StreamingApplication;
+import org.apache.hadoop.conf.Configuration;
+
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator;
 import com.datatorrent.lib.streamquery.DerbySqlStreamOperator;
 
-import org.apache.hadoop.conf.Configuration;
-
 /**
  * This demo will output the stock market data from yahoo finance
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="YahooFinanceWithDerbySQLDemo")
+@ApplicationAnnotation(name = "YahooFinanceWithDerbySQLDemo")
 public class ApplicationWithDerbySQL implements StreamingApplication
 {
   @Override
-  public void populateDAG(DAG dag, Configuration conf) {
-      String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + 
".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM");
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    String symbolStr = conf.get(ApplicationWithDerbySQL.class.getName() + 
".tickerSymbols", "YHOO,GOOG,AAPL,FB,AMZN,NFLX,IBM");
 
-      String[] symbols = symbolStr.split(",");
+    String[] symbols = symbolStr.split(",");
 
     YahooFinanceCSVInputOperator input1 = dag.addOperator("input1", new 
YahooFinanceCSVInputOperator());
     YahooFinanceCSVInputOperator input2 = dag.addOperator("input2", new 
YahooFinanceCSVInputOperator());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
----------------------------------------------------------------------
diff --git 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
index b658575..01e3ce9 100644
--- 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
+++ 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/StockTickInput.java
@@ -18,26 +18,33 @@
  */
 package com.datatorrent.demos.yahoofinance;
 
-import au.com.bytecode.opencsv.CSVReader;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.datatorrent.lib.util.KeyValPair;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
 import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpStatus;
 import org.apache.commons.httpclient.cookie.CookiePolicy;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.params.DefaultHttpParams;
 import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.KeyValPair;
+
+import au.com.bytecode.opencsv.CSVReader;
 
 /**
  * This operator sends price, volume and time into separate ports and 
calculates incremental volume.
@@ -86,14 +93,14 @@ public class StockTickInput implements InputOperator
   private String prepareURL()
   {
     String str = "http://download.finance.yahoo.com/d/quotes.csv?s=";;
-      for (int i = 0; i < symbols.length; i++) {
-        if (i != 0) {
-          str += ",";
-        }
-        str += symbols[i];
+    for (int i = 0; i < symbols.length; i++) {
+      if (i != 0) {
+        str += ",";
       }
-      str += "&f=sl1vt1&e=.csv";
-      return str;
+      str += symbols[i];
+    }
+    str += "&f=sl1vt1&e=.csv";
+    return str;
   }
 
   @Override
@@ -118,8 +125,7 @@ public class StockTickInput implements InputOperator
       int statusCode = client.executeMethod(method);
       if (statusCode != HttpStatus.SC_OK) {
         logger.error("Method failed: " + method.getStatusLine());
-      }
-      else {
+      } else {
         InputStream istream = method.getResponseBodyAsStream();
         // Process response
         InputStreamReader isr = new InputStreamReader(istream);
@@ -150,11 +156,9 @@ public class StockTickInput implements InputOperator
         }
       }
       Thread.sleep(readIntervalMillis);
-    }
-    catch (InterruptedException ex) {
+    } catch (InterruptedException ex) {
       logger.debug(ex.toString());
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       logger.debug(ex.toString());
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
----------------------------------------------------------------------
diff --git 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
index debf91d..a6aaece 100644
--- 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
+++ 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
@@ -18,7 +18,7 @@
  */
 package com.datatorrent.demos.yahoofinance;
 
-
+import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
@@ -32,7 +32,6 @@ import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
 import com.datatorrent.lib.stream.ConsolidatorKeyVal;
 import com.datatorrent.lib.util.BaseKeyValueOperator.DefaultPartitionCodec;
 import com.datatorrent.lib.util.HighLow;
-import org.apache.hadoop.conf.Configuration;
 
 /**
  * Yahoo! Finance Application Demo :<br>
@@ -191,7 +190,7 @@ import org.apache.hadoop.conf.Configuration;
  *
  * @since 0.3.2
  */
-@ApplicationAnnotation(name="YahooFinanceDemo")
+@ApplicationAnnotation(name = "YahooFinanceDemo")
 public class YahooFinanceApplication implements StreamingApplication
 {
   protected int streamingWindowSizeMilliSeconds = 1000; // 1 second
@@ -329,8 +328,8 @@ public class YahooFinanceApplication implements 
StreamingApplication
   /**
    * Populate Yahoo Finance Demo Application DAG.
    */
-       @SuppressWarnings("unchecked")
-       @Override
+  @SuppressWarnings("unchecked")
+  @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
----------------------------------------------------------------------
diff --git 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
index 0cdbfbb..cf3801e 100644
--- 
a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
+++ 
b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceCSVInputOperator.java
@@ -18,23 +18,26 @@
  */
 package com.datatorrent.demos.yahoofinance;
 
-import au.com.bytecode.opencsv.CSVReader;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpStatus;
 import org.apache.commons.httpclient.cookie.CookiePolicy;
 import org.apache.commons.httpclient.methods.GetMethod;
 import org.apache.commons.httpclient.params.DefaultHttpParams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
+
+import au.com.bytecode.opencsv.CSVReader;
 
 /**
  * Grabs Yahoo Finance quotes data and emits HashMap, with key equals the 
format name (e.g. "s0") <p>
@@ -103,7 +106,7 @@ public class YahooFinanceCSVInputOperator extends 
SimpleSinglePortInputOperator<
       if (i != 0) {
         str += ",";
       }
-       str += symbolList.get(i);
+      str += symbolList.get(i);
     }
     str += "&f=";
     for (String format: parameterList) {
@@ -129,9 +132,8 @@ public class YahooFinanceCSVInputOperator extends 
SimpleSinglePortInputOperator<
       try {
         int statusCode = client.executeMethod(method);
         if (statusCode != HttpStatus.SC_OK) {
-          System.err.println("Method failed: " + method.getStatusLine());
-        }
-        else {
+          logger.error("Method failed: " + method.getStatusLine());
+        } else {
           InputStream istream;
           istream = method.getResponseBodyAsStream();
           // Process response
@@ -148,11 +150,9 @@ public class YahooFinanceCSVInputOperator extends 
SimpleSinglePortInputOperator<
           }
         }
         Thread.sleep(readIntervalMillis);
-      }
-      catch (InterruptedException ex) {
+      } catch (InterruptedException ex) {
         logger.debug(ex.toString());
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         logger.debug(ex.toString());
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
 
b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
index 587f9de..c038e61 100644
--- 
a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
+++ 
b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationTest.java
@@ -18,10 +18,9 @@
  */
 package com.datatorrent.demos.yahoofinance;
 
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.demos.yahoofinance.YahooFinanceApplication;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
 
 /**
  * Run Yahoo Finance application demo.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
----------------------------------------------------------------------
diff --git 
a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
 
b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
index b430f92..7b134f5 100644
--- 
a/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
+++ 
b/demos/yahoofinance/src/test/java/com/datatorrent/demos/yahoofinance/ApplicationWithDerbySQLTest.java
@@ -18,16 +18,18 @@
  */
 package com.datatorrent.demos.yahoofinance;
 
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.demos.yahoofinance.ApplicationWithDerbySQL;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
 
 /**
  *
  */
 public class ApplicationWithDerbySQLTest
 {
+  private final transient Logger LOG = 
LoggerFactory.getLogger(ApplicationWithDerbySQLTest.class);
   public ApplicationWithDerbySQLTest()
   {
   }
@@ -42,7 +44,7 @@ public class ApplicationWithDerbySQLTest
     long start = System.currentTimeMillis();
     lc.run();
     long end = System.currentTimeMillis();
-    long time = end -start;
-    System.out.println("Test used "+time+" ms");
+    long time = end - start;
+    LOG.debug("Test used " + time + " ms");
   }
 }

Reply via email to