http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/META-INF/properties.xml 
b/examples/twitter/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..3f6e74d
--- /dev/null
+++ b/examples/twitter/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,121 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+<!-- properties for rolling top words example -->
+<configuration>
+  <property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1256</value>
+  </property>
+  <!-- default operator size 256MB -->
+  <property>
+    <name>dt.application.*.operator.*.attr.MEMORY_MB</name>
+    <value>256</value>
+  </property>
+  <property>
+    <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name>
+    <value>-Xmx128M</value>
+  </property>
+
+  <!-- default buffer memory 256MB -->
+  <property>
+    <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name>
+    <value>256</value>
+  </property>
+  <property>
+    <name>dt.operator.TweetSampler.consumerKey</name>
+  </property>
+  <property>
+    <name>dt.operator.TweetSampler.consumerSecret</name>
+  </property>
+  <property>
+    <name>dt.operator.TweetSampler.accessToken</name>
+  </property>
+  <property>
+    <name>dt.operator.TweetSampler.accessTokenSecret</name>
+  </property>
+  <property>
+    <name>dt.operator.TweetSampler.feedMultiplierVariance</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>dt.operator.TweetSampler.feedMultiplier</name>
+    <value>20</value>
+  </property>
+
+  <!-- RollingTopWordsExample -->
+
+  <property>
+    
<name>dt.application.RollingTopWordsExample.operator.TopCounter.topCount</name>
+    <value>10</value>
+  </property>
+  <property>
+    
<name>dt.application.RollingTopWordsExample.stream.TweetStream.locality</name>
+    <value>CONTAINER_LOCAL</value>
+  </property>
+  <property>
+    
<name>dt.application.RollingTopWordsExample.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+    <value>TwitterWordsQuery</value>
+  </property>
+  <property>
+    
<name>dt.application.RollingTopWordsExample.operator.QueryResult.topic</name>
+    <value>TwitterWordsQueryResult</value>
+  </property>
+  <property>
+    
<name>dt.application.RollingTopWordsExample.operator.QueryResult.numRetries</name>
+    <value>2147483647</value>
+  </property>
+
+  <!-- TwitterExample -->
+
+  <property>
+    
<name>dt.application.TwitterExample.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+    <value>TwitterURLQuery</value>
+  </property>
+  <property>
+    <name>dt.application.TwitterExample.operator.QueryResult.topic</name>
+    <value>TwitterURLQueryResult</value>
+  </property>
+  <property>
+    <name>dt.application.TwitterExample.operator.QueryResult.numRetries</name>
+    <value>2147483647</value>
+  </property>
+  <property>
+    
<name>dt.application.TwitterExample.operator.UniqueURLCounter.attr.APPLICATION_WINDOW_COUNT</name>
+    <value>60</value>
+  </property>
+
+  <!-- TwitterTrendingExample -->
+
+  <property>
+    
<name>dt.application.TwitterTrendingExample.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name>
+    <value>TwitterHashtagQueryExample</value>
+  </property>
+  <property>
+    
<name>dt.application.TwitterTrendingExample.operator.QueryResult.topic</name>
+    <value>TwitterHashtagQueryResultExample</value>
+  </property>
+  <property>
+    
<name>dt.application.TwitterTrendingExample.operator.QueryResult.numRetries</name>
+    <value>2147483647</value>
+  </property>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/mysql.sql
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/mysql.sql 
b/examples/twitter/src/main/resources/mysql.sql
new file mode 100644
index 0000000..e0b97dd
--- /dev/null
+++ b/examples/twitter/src/main/resources/mysql.sql
@@ -0,0 +1,35 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+--
+
+DROP TABLE if exists tweets;
+CREATE TABLE tweets (
+   window_id LONG NOT NULL,
+   creation_date DATE,
+   text VARCHAR(256) NOT NULL,
+   userid VARCHAR(40) NOT NULL,
+   KEY ( userid, creation_date)
+   );
+
+drop table if exists dt_window_id_tracker;
+CREATE TABLE dt_window_id_tracker (
+  dt_application_id VARCHAR(100) NOT NULL,
+  dt_operator_id int(11) NOT NULL,
+  dt_window_id bigint NOT NULL,
+  UNIQUE (dt_application_id, dt_operator_id, dt_window_id)
+)  ENGINE=MyISAM DEFAULT CHARSET=latin1;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/top_urls.tplg.properties
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/top_urls.tplg.properties 
b/examples/twitter/src/main/resources/top_urls.tplg.properties
new file mode 100644
index 0000000..c106d7d
--- /dev/null
+++ b/examples/twitter/src/main/resources/top_urls.tplg.properties
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+stram.node.twitterfeed.classname=com.datatorrent.example.twitter.TwitterSampleInput
+
+stram.stream.status.source=twitterfeed.output
+stram.stream.status.sinks=urlextractor.input
+
+stram.node.urlextractor.classname=com.datatorrent.example.twitter.TwitterStatusURLExtractor
+
+stram.stream.collapsedurls.source=urlextractor.output
+stram.stream.collapsedurls.sinks=
+
+stram.node.
+stram.stream.partitionedtf.input=twitterfeed.output
+stram.stream.partitionedtf.output=partitioned_counter.input
+stram.stream.partitionedtf.serdeClassname=com.datatorrent.example.twitter.URLSerDe
+
+stram.node.partitioned_counter.classname=com.datatorrent.example.twitter.PartitionedCounter
+stram.node.partitioned_counter.topCount=10
+
+stram.stream.merge_stream.input=partitioned_counter.output
+stram.stream.merge_stream.output=merge_counter.input
+stram.stream.merge_stream.serdeClassname=com.datatorrent.example.twitter.URLHolderSerde
+
+stram.node.merge_counter.classname=com.datatorrent.example.twitter.MergeSorter
+stram.node.merge_counter.topCount=10
+
+stram.stream.merged_stream.input=merge_counter.output
+stram.stream.merged_stream.output=console.input
+
+stram.node.console.classname=com.datatorrent.stream.ConsoleOutputStream

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/twitterHashTagDataSchema.json
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/twitterHashTagDataSchema.json 
b/examples/twitter/src/main/resources/twitterHashTagDataSchema.json
new file mode 100644
index 0000000..0c9296c
--- /dev/null
+++ b/examples/twitter/src/main/resources/twitterHashTagDataSchema.json
@@ -0,0 +1,4 @@
+{
+  "values": [{"name": "hashtag", "type": "string"},
+             {"name": "count", "type": "integer"}]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/twitterURLDataSchema.json
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/twitterURLDataSchema.json 
b/examples/twitter/src/main/resources/twitterURLDataSchema.json
new file mode 100644
index 0000000..ecf723e
--- /dev/null
+++ b/examples/twitter/src/main/resources/twitterURLDataSchema.json
@@ -0,0 +1,4 @@
+{
+  "values": [{"name": "url", "type": "string"},
+             {"name": "count", "type": "integer"}]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/twitterWordDataSchema.json
----------------------------------------------------------------------
diff --git a/examples/twitter/src/main/resources/twitterWordDataSchema.json 
b/examples/twitter/src/main/resources/twitterWordDataSchema.json
new file mode 100644
index 0000000..5e8e7c0
--- /dev/null
+++ b/examples/twitter/src/main/resources/twitterWordDataSchema.json
@@ -0,0 +1,4 @@
+{
+  "values": [{"name": "word", "type": "string"},
+             {"name": "count", "type": "integer"}]
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java
 
b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java
new file mode 100644
index 0000000..fcde9ef
--- /dev/null
+++ 
b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.junit.Assert.assertEquals;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test for the application which taps into the twitter's sample input stream 
and
+ * dumps all the tweets into  a database.
+ */
+public class TwitterDumpApplicationTest
+{
+  @Test
+  public void testPopulateDAG() throws Exception
+  {
+    Configuration configuration = new Configuration(false);
+
+    LocalMode lm = LocalMode.newInstance();
+    DAG prepareDAG = lm.prepareDAG(new TwitterDumpApplication(), 
configuration);
+    DAG clonedDAG = lm.cloneDAG();
+
+    assertEquals("Serialization", prepareDAG, clonedDAG);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java
----------------------------------------------------------------------
diff --git 
a/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java
 
b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java
new file mode 100644
index 0000000..628a807
--- /dev/null
+++ 
b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class TwitterTopCounterTest
+{
+  /**
+   * This test requires twitter authentication setup and is skipped by default
+   * (see {@link TwitterSampleInput}).
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    new TwitterTopCounterApplication().populateDAG(lma.getDAG(), new 
Configuration(false));
+    LocalMode.Controller lc = lma.getController();
+    lc.run(120000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java
----------------------------------------------------------------------
diff --git 
a/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java
 
b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java
new file mode 100644
index 0000000..e888eb3
--- /dev/null
+++ 
b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.twitter;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.contrib.twitter.TwitterSampleInput;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class TwitterTopWordsTest
+{
+  /**
+   * This test requires twitter authentication setup and is skipped by default
+   * (see {@link TwitterSampleInput}).
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testApplication() throws Exception
+  {
+    TwitterTopWordsApplication app = new TwitterTopWordsApplication();
+    Configuration conf = new Configuration(false);
+    conf.addResource("dt-site-rollingtopwords.xml");
+    LocalMode lma = LocalMode.newInstance();
+    lma.prepareDAG(app, conf);
+    LocalMode.Controller lc = lma.getController();
+    lc.run(120000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml
----------------------------------------------------------------------
diff --git a/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml 
b/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml
new file mode 100644
index 0000000..fa9b534
--- /dev/null
+++ b/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+<property>
+    <name>dt.attr.MASTER_MEMORY_MB</name>
+    <value>1024</value>
+  </property>
+       <property>
+               <name>dt.application.RollingTopWordsExample.class</name>
+               
<value>org.apache.apex.examples.twitter.TwitterTopWordsApplication</value>
+               <description>An alias for the application</description>
+       </property>
+       <property>
+               
<name>dt.application.RollingTopWordsExample.operator.TopCounter.topCount
+               </name>
+               <value>10</value>
+       </property>
+       <property>
+               
<name>dt.application.RollingTopWordsExample.stream.TweetStream.locality
+               </name>
+               <value>CONTAINER_LOCAL</value>
+       </property>
+       <property>
+               
<name>dt.application.RollingTopWordsExample.stream.TwittedWords.locality
+               </name>
+               <value></value>
+       </property>
+       <property>
+               
<name>dt.application.RollingTopWordsExample.stream.UniqueWordCounts.locality
+               </name>
+               <value></value>
+       </property>
+       <property>
+               
<name>dt.application.RollingTopWordsExample.stream.TopWords.locality</name>
+               <value></value>
+       </property>
+       <property>
+               <name>dt.operator.TweetSampler.consumerKey</name>
+               <value>r1DqM35iCTjbgLHf1R7rDbF5R</value>
+       </property>
+       <property>
+               <name>dt.operator.TweetSampler.consumerSecret</name>
+               
<value>KBpZiR0glPzvZPm1Sa7sq9MCGQ2H2DrVChNtmYQcU75fwuHjed</value>
+       </property>
+       <property>
+               <name>dt.operator.TweetSampler.accessToken</name>
+               
<value>2490355837-lXKev9vIGzftDjDu9LlyQiuGAqjYyPELFRyRpQo</value>
+       </property>
+       <property>
+               <name>dt.operator.TweetSampler.accessTokenSecret</name>
+               <value>lmAxZlFhcBqeTxmyatFT43fzshzrv6lsOAtHDsCBLjiuk</value>
+       </property>
+
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/twitter/src/test/resources/log4j.properties 
b/examples/twitter/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/twitter/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M 
- %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - 
%m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - 
%m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/pom.xml
----------------------------------------------------------------------
diff --git a/examples/uniquecount/pom.xml b/examples/uniquecount/pom.xml
new file mode 100644
index 0000000..37e4bb1
--- /dev/null
+++ b/examples/uniquecount/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>malhar-examples-uniquecount</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Unique Count Example</name>
+  <description></description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.1</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/uniquecount/src/assemble/appPackage.xml 
b/examples/uniquecount/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/uniquecount/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java
 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java
new file mode 100644
index 0000000..fad3718
--- /dev/null
+++ 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.stream.Counter;
+import com.datatorrent.lib.stream.StreamDuplicater;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/**
+ * Application to demonstrate PartitionableUniqueCount operator. <br>
+ * The input operator generate random keys, which is sent to
+ * PartitionableUniqueCount operator initially partitioned into three 
partitions to
+ * test unifier functionality, and output of the operator is sent to verifier 
to verify
+ * that it generates correct result.
+ *
+ * @since 1.0.2
+ */
+@ApplicationAnnotation(name = "UniqueValueCountExample")
+public class Application implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+        /* Generate random key-value pairs */
+    RandomKeysGenerator randGen = dag.addOperator("randomgen", new 
RandomKeysGenerator());
+
+
+        /* Initialize with three partition to start with */
+    // UniqueCount1 uniqCount = dag.addOperator("uniqevalue", new 
UniqueCount1());
+    UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new 
UniqueCounter<Integer>());
+
+    MapToKeyHashValuePairConverter<Integer, Integer> converter = 
dag.addOperator("converter", new MapToKeyHashValuePairConverter());
+
+    uniqCount.setCumulative(false);
+    dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<UniqueCounter<Integer>>(3));
+
+    CountVerifier<Integer> verifier = dag.addOperator("verifier", new 
CountVerifier<Integer>());
+    StreamDuplicater<KeyHashValPair<Integer, Integer>> dup = 
dag.addOperator("dup", new StreamDuplicater<KeyHashValPair<Integer, 
Integer>>());
+    ConsoleOutputOperator output = dag.addOperator("output", new 
ConsoleOutputOperator());
+
+    ConsoleOutputOperator successOutput = dag.addOperator("successoutput", new 
ConsoleOutputOperator());
+    successOutput.setStringFormat("Success %d");
+    ConsoleOutputOperator failureOutput = dag.addOperator("failureoutput", new 
ConsoleOutputOperator());
+    failureOutput.setStringFormat("Failure %d");
+
+    // success and failure counters.
+    Counter successcounter = dag.addOperator("successcounter", new Counter());
+    Counter failurecounter = dag.addOperator("failurecounter", new Counter());
+
+    dag.addStream("datain", randGen.outPort, uniqCount.data);
+    dag.addStream("dataverification0", randGen.verificationPort, verifier.in1);
+    dag.addStream("convert", uniqCount.count, 
converter.input).setLocality(Locality.THREAD_LOCAL);
+    dag.addStream("split", converter.output, dup.data);
+    dag.addStream("consoutput", dup.out1, output.input);
+    dag.addStream("dataverification1", dup.out2, verifier.in2);
+    dag.addStream("successc", verifier.successPort, successcounter.input);
+    dag.addStream("failurec", verifier.failurePort, failurecounter.input);
+    dag.addStream("succconsoutput", successcounter.output, 
successOutput.input);
+    dag.addStream("failconsoutput", failurecounter.output, 
failureOutput.input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java
 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java
new file mode 100644
index 0000000..422807e
--- /dev/null
+++ 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/*
+Compare results and print non-matching values to console.
+ */
+/**
+ * <p>CountVerifier class.</p>
+ *
+ * @since 1.0.2
+ */
+public class CountVerifier<K> implements Operator
+{
+  HashMap<K, Integer> map1 = new HashMap<K, Integer>();
+  HashMap<K, Integer> map2 = new HashMap<K, Integer>();
+
+  public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in1 = 
new DefaultInputPort<KeyHashValPair<K, Integer>>()
+  {
+    @Override
+    public void process(KeyHashValPair<K, Integer> tuple)
+    {
+      processTuple(tuple, map1);
+    }
+  };
+
+  public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in2 = 
new DefaultInputPort<KeyHashValPair<K, Integer>>()
+  {
+    @Override
+    public void process(KeyHashValPair<K, Integer> tuple)
+    {
+      processTuple(tuple, map2);
+    }
+  };
+
+  void processTuple(KeyHashValPair<K, Integer> tuple, HashMap<K, Integer> map)
+  {
+    map.put(tuple.getKey(), tuple.getValue());
+  }
+
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> successPort = new 
DefaultOutputPort<Integer>();
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> failurePort = new 
DefaultOutputPort<Integer>();
+
+  @Override
+  public void beginWindow(long l)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+    int failureCount = 0;
+    for (Map.Entry<K, Integer> e : map1.entrySet()) {
+      K key = e.getKey();
+      int val = map2.get(key);
+      if (val != e.getValue()) {
+        failureCount++;
+      }
+    }
+    if (failureCount != 0) {
+      failurePort.emit(failureCount);
+    } else {
+      successPort.emit(map1.size());
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext operatorContext)
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java
 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java
new file mode 100644
index 0000000..3e29a70
--- /dev/null
+++ 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import java.util.HashMap;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Generate random Key value pairs.
+ * key is string and value is int, it emits the pair as KeyValPair on outPort,
+ *
+ * @since 1.0.2
+ */
+public class RandomDataGenerator implements InputOperator
+{
+  public final transient DefaultOutputPort<KeyValPair<String, Object>> outPort 
= new DefaultOutputPort<KeyValPair<String, Object>>();
+  private HashMap<String, Integer> dataInfo;
+  private final transient Logger LOG = 
LoggerFactory.getLogger(RandomDataGenerator.class);
+  private int count;
+  private int sleepMs = 10;
+  private int keyRange = 100;
+  private int valueRange = 100;
+  private long tupleBlast = 10000;
+  private Random random;
+
+  public RandomDataGenerator()
+  {
+    random = new Random();
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    for (int i = 0; i < tupleBlast; i++) {
+      String key = String.valueOf(random.nextInt(keyRange));
+      int val = random.nextInt(valueRange);
+      outPort.emit(new KeyValPair<String, Object>(key, val));
+    }
+    try {
+      Thread.sleep(sleepMs);
+    } catch (Exception ex) {
+      LOG.error(ex.getMessage());
+    }
+    count++;
+  }
+
+  public int getSleepMs()
+  {
+    return sleepMs;
+  }
+
+  public void setSleepMs(int sleepMs)
+  {
+    this.sleepMs = sleepMs;
+  }
+
+  public long getTupleBlast()
+  {
+    return tupleBlast;
+  }
+
+  public void setTupleBlast(long tupleBlast)
+  {
+    this.tupleBlast = tupleBlast;
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+    LOG.debug("emitTuples called  " + count + " times in this window");
+    count = 0;
+  }
+
+  @Override
+  public void setup(Context.OperatorContext operatorContext)
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java
 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java
new file mode 100644
index 0000000..6d39c37
--- /dev/null
+++ 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Input port operator for generating random values on keys. <br>
+ * Key(s)   : key + integer in range between 0 and numKeys <br>
+ * Value(s) : integer in range of 0 to numValuesPerKeys <br>
+ *
+ * @since 0.9.3
+ */
+public class RandomKeyValues implements InputOperator
+{
+  public final transient DefaultOutputPort<KeyValPair<String, Object>> outport 
= new DefaultOutputPort<KeyValPair<String, Object>>();
+  private Random random = new Random(11111);
+  private int numKeys;
+  private int numValuesPerKeys;
+  private int tuppleBlast = 1000;
+  private int emitDelay = 20; /* 20 ms */
+
+  /* For verification */
+  private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>();
+
+  public RandomKeyValues()
+  {
+    this.numKeys = 100;
+    this.numValuesPerKeys = 100;
+  }
+
+  public RandomKeyValues(int keys, int values)
+  {
+    this.numKeys = keys;
+    this.numValuesPerKeys = values;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    /* generate tuples randomly, */
+    for (int i = 0; i < tuppleBlast; i++) {
+      int intKey = random.nextInt(numKeys);
+      String key = "key" + String.valueOf(intKey);
+      int value = random.nextInt(numValuesPerKeys);
+
+      // update history for verifying later.
+      BitSet bmap = history.get(intKey);
+      if (bmap == null) {
+        bmap = new BitSet();
+        history.put(intKey, bmap);
+      }
+      bmap.set(value);
+
+      // emit the key with value.
+      outport.emit(new KeyValPair<String, Object>(key, value));
+    }
+    try {
+      Thread.sleep(emitDelay);
+    } catch (Exception e) {
+      // Ignore.
+    }
+  }
+
+  public int getNumKeys()
+  {
+    return numKeys;
+  }
+
+  public void setNumKeys(int numKeys)
+  {
+    this.numKeys = numKeys;
+  }
+
+  public int getNumValuesPerKeys()
+  {
+    return numValuesPerKeys;
+  }
+
+  public void setNumValuesPerKeys(int numValuesPerKeys)
+  {
+    this.numValuesPerKeys = numValuesPerKeys;
+  }
+
+  public int getTuppleBlast()
+  {
+    return tuppleBlast;
+  }
+
+  public void setTuppleBlast(int tuppleBlast)
+  {
+    this.tuppleBlast = tuppleBlast;
+  }
+
+  public int getEmitDelay()
+  {
+    return emitDelay;
+  }
+
+  public void setEmitDelay(int emitDelay)
+  {
+    this.emitDelay = emitDelay;
+  }
+
+  public void debug()
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java
 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java
new file mode 100644
index 0000000..9f26b4e
--- /dev/null
+++ 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.lib.util.KeyHashValPair;
+
+/*
+    Generate random keys.
+ */
+/**
+ * <p>RandomKeysGenerator class.</p>
+ *
+ * @since 1.0.2
+ */
+public class RandomKeysGenerator implements InputOperator
+{
+
+  protected int numKeys = 100;
+  protected int tupleBlast = 15000;
+  protected long sleepTime = 0;
+  protected Map<Integer, MutableInt> history = new HashMap<Integer, 
MutableInt>();
+  private Random random = new Random();
+  private Date date = new Date();
+  private long start;
+
+  @OutputPortFieldAnnotation(optional = false)
+  public transient DefaultOutputPort<Integer> outPort = new 
DefaultOutputPort<Integer>();
+
+  @OutputPortFieldAnnotation(optional = true)
+  public transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> 
verificationPort =
+      new DefaultOutputPort<KeyHashValPair<Integer, Integer>>();
+
+  @Override
+  public void emitTuples()
+  {
+    for (int i = 0; i < tupleBlast; i++) {
+      int key = random.nextInt(numKeys);
+      outPort.emit(key);
+
+
+      if (verificationPort.isConnected()) {
+        // maintain history for later verification.
+        MutableInt count = history.get(key);
+        if (count == null) {
+          count = new MutableInt(0);
+          history.put(key, count);
+        }
+        count.increment();
+      }
+
+    }
+    try {
+      if (sleepTime != 0) {
+        Thread.sleep(sleepTime);
+      }
+    } catch (Exception ex) {
+      // Ignore.
+    }
+  }
+
+  public RandomKeysGenerator()
+  {
+    start = date.getTime();
+  }
+
+  @Override
+  public void beginWindow(long l)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+    if (verificationPort.isConnected()) {
+      for (Map.Entry<Integer, MutableInt> e : history.entrySet()) {
+        verificationPort.emit(new KeyHashValPair<Integer, Integer>(e.getKey(), 
e.getValue().toInteger()));
+      }
+      history.clear();
+    }
+
+  }
+
+  @Override
+  public void setup(Context.OperatorContext operatorContext)
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  public int getNumKeys()
+  {
+    return numKeys;
+  }
+
+  public void setNumKeys(int numKeys)
+  {
+    this.numKeys = numKeys;
+  }
+
+  public int getTupleBlast()
+  {
+    return tupleBlast;
+  }
+
+  public void setTupleBlast(int tupleBlast)
+  {
+    this.tupleBlast = tupleBlast;
+  }
+
+  public long getSleepTime()
+  {
+    return sleepTime;
+  }
+
+  public void setSleepTime(long sleepTime)
+  {
+    this.sleepTime = sleepTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java
 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java
new file mode 100644
index 0000000..ff9fdf0
--- /dev/null
+++ 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * <p>UniqueKeyValCountExample class.</p>
+ *
+ * @since 1.0.2
+ */
+@ApplicationAnnotation(name = "UniqueKeyValueCountExample")
+public class UniqueKeyValCountExample implements StreamingApplication
+{
+
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+        /* Generate random key-value pairs */
+    RandomDataGenerator randGen = dag.addOperator("randomgen", new 
RandomDataGenerator());
+
+        /* Initialize with three partition to start with */
+    UniqueCounter<KeyValPair<String, Object>> uniqCount =
+        dag.addOperator("uniqevalue", new UniqueCounter<KeyValPair<String, 
Object>>());
+    MapToKeyHashValuePairConverter<KeyValPair<String, Object>, Integer> 
converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
+    uniqCount.setCumulative(false);
+    dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new 
StatelessPartitioner<UniqueCounter<KeyValPair<String, Object>>>(3));
+
+    ConsoleOutputOperator output = dag.addOperator("output", new 
ConsoleOutputOperator());
+
+    dag.addStream("datain", randGen.outPort, uniqCount.data);
+    dag.addStream("convert", uniqCount.count, 
converter.input).setLocality(Locality.THREAD_LOCAL);
+    dag.addStream("consoutput", converter.output, output.input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java
 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java
new file mode 100644
index 0000000..713cfb9
--- /dev/null
+++ 
b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+  Example Application for new Paritionable UniqueCount Operator.
+ */
+package org.apache.apex.examples.uniquecount;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/resources/META-INF/properties.xml
----------------------------------------------------------------------
diff --git a/examples/uniquecount/src/main/resources/META-INF/properties.xml 
b/examples/uniquecount/src/main/resources/META-INF/properties.xml
new file mode 100644
index 0000000..8742328
--- /dev/null
+++ b/examples/uniquecount/src/main/resources/META-INF/properties.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<configuration>
+  <!-- 
+  <property>
+    <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
+    <value>some-default-value (if value is not specified, it is required from 
the user or custom config when launching)</value>
+  </property>
+  -->
+</configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java
 
b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java
new file mode 100644
index 0000000..95242ee
--- /dev/null
+++ 
b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class ApplicationTest
+{
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    new Application().populateDAG(lma.getDAG(), new Configuration(false));
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java
----------------------------------------------------------------------
diff --git 
a/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java
 
b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java
new file mode 100644
index 0000000..6492c9d
--- /dev/null
+++ 
b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.uniquecount;
+
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.LocalMode;
+
+/**
+ * Test the DAG declaration in local mode.
+ */
+public class UniqueKeyValExampleTest
+{
+  @Test
+  public void testApplication() throws Exception
+  {
+    LocalMode lma = LocalMode.newInstance();
+    new UniqueKeyValCountExample().populateDAG(lma.getDAG(), new 
Configuration(false));
+    LocalMode.Controller lc = lma.getController();
+    lc.run(10000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/uniquecount/src/test/resources/log4j.properties 
b/examples/uniquecount/src/test/resources/log4j.properties
new file mode 100644
index 0000000..cf0d19e
--- /dev/null
+++ b/examples/uniquecount/src/test/resources/log4j.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M 
- %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=DEBUG
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - 
%m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - 
%m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=debug
+log4j.logger.org.apache.apex=debug

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/pom.xml
----------------------------------------------------------------------
diff --git a/examples/wordcount/pom.xml b/examples/wordcount/pom.xml
new file mode 100644
index 0000000..9106c7c
--- /dev/null
+++ b/examples/wordcount/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  
+  <artifactId>malhar-examples-wordcount</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Apex Malhar Wordcount Example</name>
+  <description>A very simple application that demonstrates Apex Platform’s 
streaming window feature.</description>
+
+  <parent>
+    <groupId>org.apache.apex</groupId>
+    <artifactId>malhar-examples</artifactId>
+    <version>3.7.0-SNAPSHOT</version>
+  </parent>
+
+  <properties>
+    <skipTests>true</skipTests>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+      <version>6.6.4</version>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/assemble/appPackage.xml
----------------------------------------------------------------------
diff --git a/examples/wordcount/src/assemble/appPackage.xml 
b/examples/wordcount/src/assemble/appPackage.xml
new file mode 100644
index 0000000..4138cf2
--- /dev/null
+++ b/examples/wordcount/src/assemble/appPackage.xml
@@ -0,0 +1,59 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+  <id>appPackage</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${basedir}/target/</directory>
+      <outputDirectory>/app</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/target/deps</directory>
+      <outputDirectory>/lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/site/conf</directory>
+      <outputDirectory>/conf</outputDirectory>
+      <includes>
+        <include>*.xml</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/META-INF</directory>
+      <outputDirectory>/META-INF</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>${basedir}/src/main/resources/app</directory>
+      <outputDirectory>/app</outputDirectory>
+    </fileSet>
+  </fileSets>
+
+</assembly>
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java
----------------------------------------------------------------------
diff --git 
a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java
 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java
new file mode 100644
index 0000000..6652b79
--- /dev/null
+++ 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.wordcount;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+
+/**
+ * Simple Word Count Example : <br>
+ * This is application to count total occurrence of each word from file or any
+ * stream. <br>
+ * <br>
+ *
+ * Functional Description : <br>
+ * This example declares custom input operator to read data file set by user. 
<br>
+ * This input operator can be replaced by any stream input operator. <br>
+ * <br>
+ *
+ * Custom Attribute(s) : None <br>
+ * <br>
+ *
+ * Input Adapter : <br>
+ * Word input operator opens user specified data file and streams each line to
+ * application. <br>
+ * <br>
+ *
+ * Output Adapter : <br>
+ * Output values are written to console through ConsoleOutputOerator<br>
+ * If needed you can use other output adapters<br>
+ * <br>
+ * <p>
+ * Running Java Test or Main app in IDE:
+ *
+ * <pre>
+ *     LocalMode.runApp(new Application(), 600000); // 10 min run
+ * </pre>
+ *
+ * Run Success : <br>
+ * For successful deployment and run, user should see following output on 
console: <br>
+ * </pre>
+ * {developed=1} {bush’s=2} {roster=1} {council=1} {mankiw=1} {academia=1}
+ * {of=6} {help=1} {are=1} {presidential=1}
+ * </pre> <br> <br>
+ *
+ * Scaling Options : <br>
+ * This operator app can not be scaled, please look at implementation {@link 
com.datatorrent.lib.algo.UniqueCounterEach}  <br> <br>
+ *
+ * Application DAG : <br>
+ * <img src="doc-files/UniqueWordCounter.jpg" width=600px > <br>
+ *
+ * Streaming Window Size : 500ms
+ * Operator Details : <br>
+ * <ul>
+ * <li>
+ * <p><b> The operator wordinput : </b> This operator opens local file, reads 
each line and sends each word to application.
+ *         This can replaced by any input stream by user. <br>
+ *     Class : {@link WordCountInputOperator}  <br>
+ *     Operator Application Window Count : 1 <br>
+ *     StateFull : No
+ *  </li>
+ *  <li>
+ *     <p><b> The operator count : </b>  This operator aggregates unique key 
count  over one window count(app). <br>
+ *     Class : {@link com.datatorrent.lib.algo.UniqueCounterEach}  <br>
+ *     Operator Application Window Count : 1 <br>
+ *     StateFull : No
+ *  </li>
+ *  <li>
+ *      <p><b>The operator Console: </b> This operator just outputs the input 
tuples to  the console (or stdout).
+ *      This case it emits unique count of each word over 500ms.
+ *  </li>
+ * </ul>
+ *
+ * @since 0.3.2
+ */
+@ApplicationAnnotation(name = "WordCountExample")
+public class Application implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    WordCountInputOperator input = dag.addOperator("wordinput", new 
WordCountInputOperator());
+    UniqueCounter<String> wordCount = dag.addOperator("count", new 
UniqueCounter<String>());
+    dag.addStream("wordinput-count", input.outputPort, wordCount.data);
+    ConsoleOutputOperator consoleOperator = dag.addOperator("console", new 
ConsoleOutputOperator());
+    dag.addStream("count-console",wordCount.count, consoleOperator.input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
----------------------------------------------------------------------
diff --git 
a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
new file mode 100644
index 0000000..699469b
--- /dev/null
+++ 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.wordcount;
+
+import java.net.URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+
+import com.datatorrent.lib.appdata.schemas.SchemaUtils;
+import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery;
+import com.datatorrent.lib.io.PubSubWebSocketAppDataResult;
+
+/**
+ * Simple example that computes word frequencies from any file dropped into a
+ * monitored directory. It outputs the top N word-frequency pairs for each file
+ * as well globally across all files.
+ * <p>
+ * Each input file generates a corresponding output file in the output 
directory
+ * containing the top N pairs for that file. The output is also written
+ * to an internal store to support visualization in the UI via queries.
+ * <p>
+ * @since 3.2.0
+ */
+@ApplicationAnnotation(name = "TopNWordsWithQueries")
+public class ApplicationWithQuerySupport implements StreamingApplication
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(ApplicationWithQuerySupport.class);
+
+  /**
+   * Name of schema file.
+   */
+  public static final String SNAPSHOT_SCHEMA = "WordDataSchema.json";
+
+  /**
+   * Populates the DAG with operators and connecting streams
+   *
+   * @param dag The directed acyclic graph of operators to populate
+   * @param conf The configuration
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // create operators
+    LineReader lineReader            = dag.addOperator("lineReader", new 
LineReader());
+    WordReader wordReader            = dag.addOperator("wordReader", new 
WordReader());
+    WindowWordCount windowWordCount  = dag.addOperator("windowWordCount", new 
WindowWordCount());
+    FileWordCount fileWordCount      = dag.addOperator("fileWordCount", new 
FileWordCount());
+    WordCountWriter wcWriter         = dag.addOperator("wcWriter", new 
WordCountWriter());
+    ConsoleOutputOperator console    = dag.addOperator("console", new 
ConsoleOutputOperator());
+    console.setStringFormat("wordCount: %s");
+
+    // create streams
+
+    dag.addStream("lines",   lineReader.output,  wordReader.input);
+    dag.addStream("control", lineReader.control, fileWordCount.control);
+    dag.addStream("words",   wordReader.output,  windowWordCount.input);
+    dag.addStream("windowWordCounts", windowWordCount.output, 
fileWordCount.input);
+    dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input);
+
+    String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS);
+
+    if (!StringUtils.isEmpty(gatewayAddress)) {        // add query support
+      URI uri = URI.create("ws://" + gatewayAddress + "/pubsub");
+
+      AppDataSnapshotServerMap snapshotServerFile
+          = dag.addOperator("snapshotServerFile", new 
AppDataSnapshotServerMap());
+      AppDataSnapshotServerMap snapshotServerGlobal
+          = dag.addOperator("snapshotServerGlobal", new 
AppDataSnapshotServerMap());
+
+      String snapshotServerJSON = 
SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA);
+      snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON);
+      snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON);
+
+      PubSubWebSocketAppDataQuery wsQueryFile = new 
PubSubWebSocketAppDataQuery();
+      PubSubWebSocketAppDataQuery wsQueryGlobal = new 
PubSubWebSocketAppDataQuery();
+      wsQueryFile.setUri(uri);
+      wsQueryGlobal.setUri(uri);
+
+      snapshotServerFile.setEmbeddableQueryInfoProvider(wsQueryFile);
+      snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal);
+
+      PubSubWebSocketAppDataResult wsResultFile
+          = dag.addOperator("wsResultFile", new 
PubSubWebSocketAppDataResult());
+      PubSubWebSocketAppDataResult wsResultGlobal
+          = dag.addOperator("wsResultGlobal", new 
PubSubWebSocketAppDataResult());
+      wsResultFile.setUri(uri);
+      wsResultGlobal.setUri(uri);
+
+      Operator.InputPort<String> queryResultFilePort = wsResultFile.input;
+      Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input;
+
+      dag.addStream("WordCountsFile", fileWordCount.outputPerFile, 
snapshotServerFile.input, console.input);
+      dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, 
snapshotServerGlobal.input);
+
+      dag.addStream("ResultFile", snapshotServerFile.queryResult, 
queryResultFilePort);
+      dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, 
queryResultGlobalPort);
+    } else {
+      //throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS");
+      dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input);
+    }
+
+    LOG.info("done with populateDAG, isDebugEnabled = " + 
LOG.isDebugEnabled());
+    LOG.info("Returning from populateDAG");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java
 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java
new file mode 100644
index 0000000..a30d0b0
--- /dev/null
+++ 
b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.wordcount;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Computes word frequencies per file and globally, and writes the top N pairs 
to an output file
+ * and to snapshot servers for visualization.
+ * Currently designed to work with only 1 file at a time; will be enhanced 
later to support
+ * multiple files dropped into the monitored directory at the same time.
+ *
+ * <p>
+ * Receives per-window list of pairs (word, frequency) on the input port. When 
the end of a file
+ * is reached, expects to get an EOF on the control port; at the next 
endWindow, the top N words
+ * and frequencies are computed and emitted to the output ports.
+ * <p>
+ * There are 3 output ports: (a) One for the per-file top N counts emitted 
when the file is fully
+ * read and is written to the output file. (b) One for the top N counts 
emitted per window for the
+ * current file to the snapshot server and (c) One for the global top N counts 
emitted per window
+ * to a different snapshot server.
+ *
+ * Since the EOF is received by a single operator, this operator cannot be 
partitionable
+ *
+ * @since 3.2.0
+ */
+public class FileWordCount extends BaseOperator
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileWordCount.class);
+  private static final String GLOBAL = "global";
+
+  /**
+   * If {@literal topN > 0}, only data for the topN most frequent words is 
output; if topN == 0, the
+   * entire frequency map is output
+   */
+  protected int topN;
+
+  /**
+   * Set to true when an EOF control tuple for the current input file is 
received; reset to false
+   * when the corresponding output file has been written.
+   */
+  protected boolean eof = false;
+
+  /**
+   * Last component of path (just the file name)
+   * incoming value from control tuple
+   */
+  protected String fileName;
+
+  /**
+   * {@literal (word => frequency)} map: current file, all words
+   */
+  protected Map<String, WCPair> wordMapFile = new HashMap<>();
+
+  /**
+   * {@literal (word => frequency)} map: global, all words
+   */
+  protected Map<String, WCPair> wordMapGlobal = new HashMap<>();
+
+  /**
+   * Singleton list with per file data; sent on {@code outputPerFile}
+   */
+  protected transient List<Map<String, Object>> resultPerFile;
+
+  /**
+   * Singleton list with global data; sent on {@code outputGlobal}
+   */
+  protected transient List<Map<String, Object>> resultGlobal;
+
+  /**
+   * Singleton map of {@code fileName} to sorted list of (word, frequency) 
pairs
+   */
+  protected transient Map<String, Object> resultFileFinal;
+
+  /**
+   * final list of (word, frequency) pairs written to output file
+   */
+  protected transient List<WCPair> fileFinalList;
+
+  /**
+   * Input port on which per-window {@literal (word => frequency)} map is 
received; the map
+   * is merged into {@code wordMapFile} and {@code wordMapGlobal}.
+   */
+  public final transient DefaultInputPort<List<WCPair>> input = new 
DefaultInputPort<List<WCPair>>()
+  {
+    @Override
+    public void process(List<WCPair> list)
+    {
+      // blend incoming list into wordMapFile and wordMapGlobal
+      for (WCPair pair : list) {
+        final String word = pair.word;
+        WCPair filePair = wordMapFile.get(word);
+        if (null != filePair) {    // word seen previously in current file
+          WCPair globalPair = wordMapGlobal.get(word);    // cannot be null
+          filePair.freq += pair.freq;
+          globalPair.freq += pair.freq;
+          continue;
+        }
+
+        // new word in current file
+        filePair = new WCPair(word, pair.freq);
+        wordMapFile.put(word, filePair);
+
+        // check global map
+        WCPair globalPair = wordMapGlobal.get(word);    // may be null
+        if (null != globalPair) {    // word seen previously
+          globalPair.freq += pair.freq;
+          continue;
+        }
+
+        // word never seen before
+        globalPair = new WCPair(word, pair.freq);
+        wordMapGlobal.put(word, globalPair);
+      }
+    }
+  };
+
+  /**
+   * Control port on which the current file name is received to indicate EOF
+   */
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort<String> control = new 
DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String msg)
+    {
+      if (msg.isEmpty()) {    // sanity check
+        throw new RuntimeException("Empty file path");
+      }
+      LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN);
+      fileName = msg;
+      eof = true;
+      // NOTE: current version only supports processing one file at a time.
+    }
+  };
+
+  /**
+   * Output port for current file output
+   */
+  public final transient DefaultOutputPort<List<Map<String, Object>>>
+      outputPerFile = new DefaultOutputPort<>();
+
+  /**
+   * Output port for global output
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<List<Map<String, Object>>>
+      outputGlobal = new DefaultOutputPort<>();
+
+  /**
+   * Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} 
is the final
+   * top N pairs for current file and will be written to the output file; 
emitted in the
+   * {@code endWindow()} call after an EOF
+   */
+  public final transient DefaultOutputPort<Map<String, Object>>
+      fileOutput = new DefaultOutputPort<>();
+
+  /**
+   * Get the number of top (word, frequency) pairs that will be output
+   */
+  public int getTopN()
+  {
+    return topN;
+  }
+
+  /**
+   * Set the number of top (word, frequency) pairs that will be output
+   * @param n The new number
+   */
+  public void setTopN(int n)
+  {
+    topN = n;
+  }
+
+  /**
+   * {@inheritDoc}
+   * Initialize various map and list fields
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+    if (null == wordMapFile) {
+      wordMapFile = new HashMap<>();
+    }
+    if (null == wordMapGlobal) {
+      wordMapGlobal = new HashMap<>();
+    }
+    resultPerFile = new ArrayList(1);
+    resultGlobal = new ArrayList(1);
+    // singleton map {<fileName> => fileFinalList}; cannot populate it yet 
since we need fileName
+    resultFileFinal = new HashMap<>(1);
+    fileFinalList = new ArrayList<>();
+  }
+
+  /**
+   * {@inheritDoc}
+   * This is where we do most of the work:
+   * 1. Sort global map and emit top N pairs
+   * 2. Sort current file map and emit top N pairs
+   * 3. If we've seen EOF, emit top N pairs on port connected to file writer 
and clear all per-file
+   *    data structures.
+   */
+  @Override
+  public void endWindow()
+  {
+    LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN);
+
+    if (wordMapFile.isEmpty()) {    // no words found
+      if (eof) {                    // write empty list to fileOutput port
+        // got EOF, so output empty list to output file
+        fileFinalList.clear();
+        resultFileFinal.put(fileName, fileFinalList);
+        fileOutput.emit(resultFileFinal);
+
+        // reset for next file
+        eof = false;
+        fileName = null;
+        resultFileFinal.clear();
+      }
+      LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", 
fileName, topN);
+      return;
+    }
+
+    LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = 
{}", fileName, wordMapFile.size(), topN);
+
+    // get topN list for this file and, if we have EOF, emit to fileOutput port
+
+    // get topN global list and emit to global output port
+    getTopNMap(wordMapGlobal, resultGlobal);
+    LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size());
+    outputGlobal.emit(resultGlobal);
+
+    // get topN list for this file and emit to file output port
+    getTopNMap(wordMapFile, resultPerFile);
+    LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size());
+    outputPerFile.emit(resultPerFile);
+
+    if (eof) {                     // got EOF earlier
+      if (null == fileName) {      // need file name to emit topN pairs to 
file writer
+        throw new RuntimeException("EOF but no fileName at endWindow");
+      }
+
+      // so compute final topN list from wordMapFile into fileFinalList and 
emit it
+      getTopNList(wordMapFile);
+      resultFileFinal.put(fileName, fileFinalList);
+      fileOutput.emit(resultFileFinal);
+
+      // reset for next file
+      eof = false;
+      fileName = null;
+      wordMapFile.clear();
+      resultFileFinal.clear();
+    }
+  }
+
+  /**
+   * Get topN frequencies from map, convert each pair to a singleton map and 
append to result
+   * This map is suitable input to AppDataSnapshotServer
+   * MUST have {@code map.size() > 0} here
+   */
+  private void getTopNMap(final Map<String, WCPair> map, List<Map<String, 
Object>> result)
+  {
+    final ArrayList<WCPair> list = new ArrayList<>(map.values());
+
+    // sort entries in descending order of frequency
+    Collections.sort(list, new Comparator<WCPair>()
+    {
+      @Override
+      public int compare(WCPair o1, WCPair o2)
+      {
+        return (int)(o2.freq - o1.freq);
+      }
+    });
+
+    if (topN > 0) {
+      list.subList(topN, map.size()).clear();      // retain only the first 
topN entries
+    }
+
+    // convert each pair (word, freq) of list to a map with 2 elements
+    // {("word": <word>, "count": freq)} and append to list
+    //
+    result.clear();
+    for (WCPair pair : list) {
+      Map<String, Object> wmap = new HashMap<>(2);
+      wmap.put("word", pair.word);
+      wmap.put("count", pair.freq);
+      result.add(wmap);
+    }
+    LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size());
+    list.clear();
+  }
+
+  /**
+   * Populate fileFinalList with topN frequencies from argument
+   * This list is suitable input to WordCountWriter which writes it to a file
+   * MUST have {@code map.size() > 0} here
+   */
+  private void getTopNList(final Map<String, WCPair> map)
+  {
+    fileFinalList.clear();
+    fileFinalList.addAll(map.values());
+
+    // sort entries in descending order of frequency
+    Collections.sort(fileFinalList, new Comparator<WCPair>()
+    {
+      @Override
+      public int compare(WCPair o1, WCPair o2)
+      {
+        return (int)(o2.freq - o1.freq);
+      }
+    });
+
+    if (topN > 0) {
+      fileFinalList.subList(topN, map.size()).clear();      // retain only the 
first topN entries
+    }
+    LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", 
fileFinalList.size());
+  }
+}

Reply via email to