http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/resources/META-INF/properties.xml b/examples/twitter/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..3f6e74d --- /dev/null +++ b/examples/twitter/src/main/resources/META-INF/properties.xml @@ -0,0 +1,121 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<!-- properties for rolling top words example --> +<configuration> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1256</value> + </property> + <!-- default operator size 256MB --> + <property> + <name>dt.application.*.operator.*.attr.MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> + <value>-Xmx128M</value> + </property> + + <!-- default buffer memory 256MB --> + <property> + <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.operator.TweetSampler.consumerKey</name> + </property> + <property> + <name>dt.operator.TweetSampler.consumerSecret</name> + </property> + <property> + <name>dt.operator.TweetSampler.accessToken</name> + </property> + <property> + <name>dt.operator.TweetSampler.accessTokenSecret</name> + </property> + <property> + <name>dt.operator.TweetSampler.feedMultiplierVariance</name> + <value>5</value> + </property> + <property> + <name>dt.operator.TweetSampler.feedMultiplier</name> + <value>20</value> + </property> + + <!-- RollingTopWordsExample --> + + <property> + <name>dt.application.RollingTopWordsExample.operator.TopCounter.topCount</name> + <value>10</value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.stream.TweetStream.locality</name> + <value>CONTAINER_LOCAL</value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> + <value>TwitterWordsQuery</value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.operator.QueryResult.topic</name> + <value>TwitterWordsQueryResult</value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.operator.QueryResult.numRetries</name> + <value>2147483647</value> + </property> + + <!-- TwitterExample --> + + <property> + <name>dt.application.TwitterExample.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> + <value>TwitterURLQuery</value> + </property> + <property> + <name>dt.application.TwitterExample.operator.QueryResult.topic</name> + <value>TwitterURLQueryResult</value> + </property> + <property> + <name>dt.application.TwitterExample.operator.QueryResult.numRetries</name> + <value>2147483647</value> + </property> + <property> + <name>dt.application.TwitterExample.operator.UniqueURLCounter.attr.APPLICATION_WINDOW_COUNT</name> + <value>60</value> + </property> + + <!-- TwitterTrendingExample --> + + <property> + <name>dt.application.TwitterTrendingExample.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> + <value>TwitterHashtagQueryExample</value> + </property> + <property> + <name>dt.application.TwitterTrendingExample.operator.QueryResult.topic</name> + <value>TwitterHashtagQueryResultExample</value> + </property> + <property> + <name>dt.application.TwitterTrendingExample.operator.QueryResult.numRetries</name> + <value>2147483647</value> + </property> + +</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/mysql.sql ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/resources/mysql.sql b/examples/twitter/src/main/resources/mysql.sql new file mode 100644 index 0000000..e0b97dd --- /dev/null +++ b/examples/twitter/src/main/resources/mysql.sql @@ -0,0 +1,35 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +DROP TABLE if exists tweets; +CREATE TABLE tweets ( + window_id LONG NOT NULL, + creation_date DATE, + text VARCHAR(256) NOT NULL, + userid VARCHAR(40) NOT NULL, + KEY ( userid, creation_date) + ); + +drop table if exists dt_window_id_tracker; +CREATE TABLE dt_window_id_tracker ( + dt_application_id VARCHAR(100) NOT NULL, + dt_operator_id int(11) NOT NULL, + dt_window_id bigint NOT NULL, + UNIQUE (dt_application_id, dt_operator_id, dt_window_id) +) ENGINE=MyISAM DEFAULT CHARSET=latin1; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/top_urls.tplg.properties ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/resources/top_urls.tplg.properties b/examples/twitter/src/main/resources/top_urls.tplg.properties new file mode 100644 index 0000000..c106d7d --- /dev/null +++ b/examples/twitter/src/main/resources/top_urls.tplg.properties @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +stram.node.twitterfeed.classname=com.datatorrent.example.twitter.TwitterSampleInput + +stram.stream.status.source=twitterfeed.output +stram.stream.status.sinks=urlextractor.input + +stram.node.urlextractor.classname=com.datatorrent.example.twitter.TwitterStatusURLExtractor + +stram.stream.collapsedurls.source=urlextractor.output +stram.stream.collapsedurls.sinks= + +stram.node. +stram.stream.partitionedtf.input=twitterfeed.output +stram.stream.partitionedtf.output=partitioned_counter.input +stram.stream.partitionedtf.serdeClassname=com.datatorrent.example.twitter.URLSerDe + +stram.node.partitioned_counter.classname=com.datatorrent.example.twitter.PartitionedCounter +stram.node.partitioned_counter.topCount=10 + +stram.stream.merge_stream.input=partitioned_counter.output +stram.stream.merge_stream.output=merge_counter.input +stram.stream.merge_stream.serdeClassname=com.datatorrent.example.twitter.URLHolderSerde + +stram.node.merge_counter.classname=com.datatorrent.example.twitter.MergeSorter +stram.node.merge_counter.topCount=10 + +stram.stream.merged_stream.input=merge_counter.output +stram.stream.merged_stream.output=console.input + +stram.node.console.classname=com.datatorrent.stream.ConsoleOutputStream http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/twitterHashTagDataSchema.json ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/resources/twitterHashTagDataSchema.json b/examples/twitter/src/main/resources/twitterHashTagDataSchema.json new file mode 100644 index 0000000..0c9296c --- /dev/null +++ b/examples/twitter/src/main/resources/twitterHashTagDataSchema.json @@ -0,0 +1,4 @@ +{ + "values": [{"name": "hashtag", "type": "string"}, + {"name": "count", "type": "integer"}] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/twitterURLDataSchema.json ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/resources/twitterURLDataSchema.json b/examples/twitter/src/main/resources/twitterURLDataSchema.json new file mode 100644 index 0000000..ecf723e --- /dev/null +++ b/examples/twitter/src/main/resources/twitterURLDataSchema.json @@ -0,0 +1,4 @@ +{ + "values": [{"name": "url", "type": "string"}, + {"name": "count", "type": "integer"}] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/twitterWordDataSchema.json ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/resources/twitterWordDataSchema.json b/examples/twitter/src/main/resources/twitterWordDataSchema.json new file mode 100644 index 0000000..5e8e7c0 --- /dev/null +++ b/examples/twitter/src/main/resources/twitterWordDataSchema.json @@ -0,0 +1,4 @@ +{ + "values": [{"name": "word", "type": "string"}, + {"name": "count", "type": "integer"}] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java new file mode 100644 index 0000000..fcde9ef --- /dev/null +++ b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterDumpApplicationTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import static org.junit.Assert.assertEquals; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; + +/** + * Test for the application which taps into the twitter's sample input stream and + * dumps all the tweets into a database. + */ +public class TwitterDumpApplicationTest +{ + @Test + public void testPopulateDAG() throws Exception + { + Configuration configuration = new Configuration(false); + + LocalMode lm = LocalMode.newInstance(); + DAG prepareDAG = lm.prepareDAG(new TwitterDumpApplication(), configuration); + DAG clonedDAG = lm.cloneDAG(); + + assertEquals("Serialization", prepareDAG, clonedDAG); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java new file mode 100644 index 0000000..628a807 --- /dev/null +++ b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopCounterTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; +import com.datatorrent.contrib.twitter.TwitterSampleInput; + +/** + * Test the DAG declaration in local mode. + */ +public class TwitterTopCounterTest +{ + /** + * This test requires twitter authentication setup and is skipped by default + * (see {@link TwitterSampleInput}). + * + * @throws Exception + */ + @Test + public void testApplication() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + new TwitterTopCounterApplication().populateDAG(lma.getDAG(), new Configuration(false)); + LocalMode.Controller lc = lma.getController(); + lc.run(120000); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java new file mode 100644 index 0000000..e888eb3 --- /dev/null +++ b/examples/twitter/src/test/java/org/apache/apex/examples/twitter/TwitterTopWordsTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; +import com.datatorrent.contrib.twitter.TwitterSampleInput; + +/** + * Test the DAG declaration in local mode. + */ +public class TwitterTopWordsTest +{ + /** + * This test requires twitter authentication setup and is skipped by default + * (see {@link TwitterSampleInput}). + * + * @throws Exception + */ + @Test + public void testApplication() throws Exception + { + TwitterTopWordsApplication app = new TwitterTopWordsApplication(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-rollingtopwords.xml"); + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.run(120000); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml ---------------------------------------------------------------------- diff --git a/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml b/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml new file mode 100644 index 0000000..fa9b534 --- /dev/null +++ b/examples/twitter/src/test/resources/dt-site-rollingtopwords.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> +<property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.class</name> + <value>org.apache.apex.examples.twitter.TwitterTopWordsApplication</value> + <description>An alias for the application</description> + </property> + <property> + <name>dt.application.RollingTopWordsExample.operator.TopCounter.topCount + </name> + <value>10</value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.stream.TweetStream.locality + </name> + <value>CONTAINER_LOCAL</value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.stream.TwittedWords.locality + </name> + <value></value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.stream.UniqueWordCounts.locality + </name> + <value></value> + </property> + <property> + <name>dt.application.RollingTopWordsExample.stream.TopWords.locality</name> + <value></value> + </property> + <property> + <name>dt.operator.TweetSampler.consumerKey</name> + <value>r1DqM35iCTjbgLHf1R7rDbF5R</value> + </property> + <property> + <name>dt.operator.TweetSampler.consumerSecret</name> + <value>KBpZiR0glPzvZPm1Sa7sq9MCGQ2H2DrVChNtmYQcU75fwuHjed</value> + </property> + <property> + <name>dt.operator.TweetSampler.accessToken</name> + <value>2490355837-lXKev9vIGzftDjDu9LlyQiuGAqjYyPELFRyRpQo</value> + </property> + <property> + <name>dt.operator.TweetSampler.accessTokenSecret</name> + <value>lmAxZlFhcBqeTxmyatFT43fzshzrv6lsOAtHDsCBLjiuk</value> + </property> + +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/twitter/src/test/resources/log4j.properties b/examples/twitter/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/twitter/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/pom.xml ---------------------------------------------------------------------- diff --git a/examples/uniquecount/pom.xml b/examples/uniquecount/pom.xml new file mode 100644 index 0000000..37e4bb1 --- /dev/null +++ b/examples/uniquecount/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-uniquecount</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar Unique Count Example</name> + <description></description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.1</version> + <scope>provided</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/assemble/appPackage.xml b/examples/uniquecount/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/uniquecount/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java new file mode 100644 index 0000000..fad3718 --- /dev/null +++ b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/Application.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.algo.UniqueCounter; +import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.stream.Counter; +import com.datatorrent.lib.stream.StreamDuplicater; +import com.datatorrent.lib.util.KeyHashValPair; + +/** + * Application to demonstrate PartitionableUniqueCount operator. <br> + * The input operator generate random keys, which is sent to + * PartitionableUniqueCount operator initially partitioned into three partitions to + * test unifier functionality, and output of the operator is sent to verifier to verify + * that it generates correct result. + * + * @since 1.0.2 + */ +@ApplicationAnnotation(name = "UniqueValueCountExample") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration entries) + { + /* Generate random key-value pairs */ + RandomKeysGenerator randGen = dag.addOperator("randomgen", new RandomKeysGenerator()); + + + /* Initialize with three partition to start with */ + // UniqueCount1 uniqCount = dag.addOperator("uniqevalue", new UniqueCount1()); + UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>()); + + MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter()); + + uniqCount.setCumulative(false); + dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3)); + + CountVerifier<Integer> verifier = dag.addOperator("verifier", new CountVerifier<Integer>()); + StreamDuplicater<KeyHashValPair<Integer, Integer>> dup = dag.addOperator("dup", new StreamDuplicater<KeyHashValPair<Integer, Integer>>()); + ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator()); + + ConsoleOutputOperator successOutput = dag.addOperator("successoutput", new ConsoleOutputOperator()); + successOutput.setStringFormat("Success %d"); + ConsoleOutputOperator failureOutput = dag.addOperator("failureoutput", new ConsoleOutputOperator()); + failureOutput.setStringFormat("Failure %d"); + + // success and failure counters. + Counter successcounter = dag.addOperator("successcounter", new Counter()); + Counter failurecounter = dag.addOperator("failurecounter", new Counter()); + + dag.addStream("datain", randGen.outPort, uniqCount.data); + dag.addStream("dataverification0", randGen.verificationPort, verifier.in1); + dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL); + dag.addStream("split", converter.output, dup.data); + dag.addStream("consoutput", dup.out1, output.input); + dag.addStream("dataverification1", dup.out2, verifier.in2); + dag.addStream("successc", verifier.successPort, successcounter.input); + dag.addStream("failurec", verifier.failurePort, failurecounter.input); + dag.addStream("succconsoutput", successcounter.output, successOutput.input); + dag.addStream("failconsoutput", failurecounter.output, failureOutput.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java new file mode 100644 index 0000000..422807e --- /dev/null +++ b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/CountVerifier.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import java.util.HashMap; +import java.util.Map; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.KeyHashValPair; + +/* +Compare results and print non-matching values to console. + */ +/** + * <p>CountVerifier class.</p> + * + * @since 1.0.2 + */ +public class CountVerifier<K> implements Operator +{ + HashMap<K, Integer> map1 = new HashMap<K, Integer>(); + HashMap<K, Integer> map2 = new HashMap<K, Integer>(); + + public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in1 = new DefaultInputPort<KeyHashValPair<K, Integer>>() + { + @Override + public void process(KeyHashValPair<K, Integer> tuple) + { + processTuple(tuple, map1); + } + }; + + public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in2 = new DefaultInputPort<KeyHashValPair<K, Integer>>() + { + @Override + public void process(KeyHashValPair<K, Integer> tuple) + { + processTuple(tuple, map2); + } + }; + + void processTuple(KeyHashValPair<K, Integer> tuple, HashMap<K, Integer> map) + { + map.put(tuple.getKey(), tuple.getValue()); + } + + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>(); + + @Override + public void beginWindow(long l) + { + + } + + @Override + public void endWindow() + { + int failureCount = 0; + for (Map.Entry<K, Integer> e : map1.entrySet()) { + K key = e.getKey(); + int val = map2.get(key); + if (val != e.getValue()) { + failureCount++; + } + } + if (failureCount != 0) { + failurePort.emit(failureCount); + } else { + successPort.emit(map1.size()); + } + } + + @Override + public void setup(Context.OperatorContext operatorContext) + { + + } + + @Override + public void teardown() + { + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java new file mode 100644 index 0000000..3e29a70 --- /dev/null +++ b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomDataGenerator.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import java.util.HashMap; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Generate random Key value pairs. + * key is string and value is int, it emits the pair as KeyValPair on outPort, + * + * @since 1.0.2 + */ +public class RandomDataGenerator implements InputOperator +{ + public final transient DefaultOutputPort<KeyValPair<String, Object>> outPort = new DefaultOutputPort<KeyValPair<String, Object>>(); + private HashMap<String, Integer> dataInfo; + private final transient Logger LOG = LoggerFactory.getLogger(RandomDataGenerator.class); + private int count; + private int sleepMs = 10; + private int keyRange = 100; + private int valueRange = 100; + private long tupleBlast = 10000; + private Random random; + + public RandomDataGenerator() + { + random = new Random(); + } + + @Override + public void emitTuples() + { + for (int i = 0; i < tupleBlast; i++) { + String key = String.valueOf(random.nextInt(keyRange)); + int val = random.nextInt(valueRange); + outPort.emit(new KeyValPair<String, Object>(key, val)); + } + try { + Thread.sleep(sleepMs); + } catch (Exception ex) { + LOG.error(ex.getMessage()); + } + count++; + } + + public int getSleepMs() + { + return sleepMs; + } + + public void setSleepMs(int sleepMs) + { + this.sleepMs = sleepMs; + } + + public long getTupleBlast() + { + return tupleBlast; + } + + public void setTupleBlast(long tupleBlast) + { + this.tupleBlast = tupleBlast; + } + + @Override + public void beginWindow(long l) + { + + } + + @Override + public void endWindow() + { + LOG.debug("emitTuples called " + count + " times in this window"); + count = 0; + } + + @Override + public void setup(Context.OperatorContext operatorContext) + { + + } + + @Override + public void teardown() + { + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java new file mode 100644 index 0000000..6d39c37 --- /dev/null +++ b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeyValues.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import java.util.BitSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Input port operator for generating random values on keys. <br> + * Key(s) : key + integer in range between 0 and numKeys <br> + * Value(s) : integer in range of 0 to numValuesPerKeys <br> + * + * @since 0.9.3 + */ +public class RandomKeyValues implements InputOperator +{ + public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>(); + private Random random = new Random(11111); + private int numKeys; + private int numValuesPerKeys; + private int tuppleBlast = 1000; + private int emitDelay = 20; /* 20 ms */ + + /* For verification */ + private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>(); + + public RandomKeyValues() + { + this.numKeys = 100; + this.numValuesPerKeys = 100; + } + + public RandomKeyValues(int keys, int values) + { + this.numKeys = keys; + this.numValuesPerKeys = values; + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + @Override + public void emitTuples() + { + /* generate tuples randomly, */ + for (int i = 0; i < tuppleBlast; i++) { + int intKey = random.nextInt(numKeys); + String key = "key" + String.valueOf(intKey); + int value = random.nextInt(numValuesPerKeys); + + // update history for verifying later. + BitSet bmap = history.get(intKey); + if (bmap == null) { + bmap = new BitSet(); + history.put(intKey, bmap); + } + bmap.set(value); + + // emit the key with value. + outport.emit(new KeyValPair<String, Object>(key, value)); + } + try { + Thread.sleep(emitDelay); + } catch (Exception e) { + // Ignore. + } + } + + public int getNumKeys() + { + return numKeys; + } + + public void setNumKeys(int numKeys) + { + this.numKeys = numKeys; + } + + public int getNumValuesPerKeys() + { + return numValuesPerKeys; + } + + public void setNumValuesPerKeys(int numValuesPerKeys) + { + this.numValuesPerKeys = numValuesPerKeys; + } + + public int getTuppleBlast() + { + return tuppleBlast; + } + + public void setTuppleBlast(int tuppleBlast) + { + this.tuppleBlast = tuppleBlast; + } + + public int getEmitDelay() + { + return emitDelay; + } + + public void setEmitDelay(int emitDelay) + { + this.emitDelay = emitDelay; + } + + public void debug() + { + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java new file mode 100644 index 0000000..9f26b4e --- /dev/null +++ b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/RandomKeysGenerator.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.util.KeyHashValPair; + +/* + Generate random keys. + */ +/** + * <p>RandomKeysGenerator class.</p> + * + * @since 1.0.2 + */ +public class RandomKeysGenerator implements InputOperator +{ + + protected int numKeys = 100; + protected int tupleBlast = 15000; + protected long sleepTime = 0; + protected Map<Integer, MutableInt> history = new HashMap<Integer, MutableInt>(); + private Random random = new Random(); + private Date date = new Date(); + private long start; + + @OutputPortFieldAnnotation(optional = false) + public transient DefaultOutputPort<Integer> outPort = new DefaultOutputPort<Integer>(); + + @OutputPortFieldAnnotation(optional = true) + public transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> verificationPort = + new DefaultOutputPort<KeyHashValPair<Integer, Integer>>(); + + @Override + public void emitTuples() + { + for (int i = 0; i < tupleBlast; i++) { + int key = random.nextInt(numKeys); + outPort.emit(key); + + + if (verificationPort.isConnected()) { + // maintain history for later verification. + MutableInt count = history.get(key); + if (count == null) { + count = new MutableInt(0); + history.put(key, count); + } + count.increment(); + } + + } + try { + if (sleepTime != 0) { + Thread.sleep(sleepTime); + } + } catch (Exception ex) { + // Ignore. + } + } + + public RandomKeysGenerator() + { + start = date.getTime(); + } + + @Override + public void beginWindow(long l) + { + + } + + @Override + public void endWindow() + { + + if (verificationPort.isConnected()) { + for (Map.Entry<Integer, MutableInt> e : history.entrySet()) { + verificationPort.emit(new KeyHashValPair<Integer, Integer>(e.getKey(), e.getValue().toInteger())); + } + history.clear(); + } + + } + + @Override + public void setup(Context.OperatorContext operatorContext) + { + + } + + @Override + public void teardown() + { + + } + + public int getNumKeys() + { + return numKeys; + } + + public void setNumKeys(int numKeys) + { + this.numKeys = numKeys; + } + + public int getTupleBlast() + { + return tupleBlast; + } + + public void setTupleBlast(int tupleBlast) + { + this.tupleBlast = tupleBlast; + } + + public long getSleepTime() + { + return sleepTime; + } + + public void setSleepTime(long sleepTime) + { + this.sleepTime = sleepTime; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java new file mode 100644 index 0000000..ff9fdf0 --- /dev/null +++ b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/UniqueKeyValCountExample.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.common.partitioner.StatelessPartitioner; + +import com.datatorrent.lib.algo.UniqueCounter; +import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.util.KeyValPair; + +/** + * <p>UniqueKeyValCountExample class.</p> + * + * @since 1.0.2 + */ +@ApplicationAnnotation(name = "UniqueKeyValueCountExample") +public class UniqueKeyValCountExample implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration entries) + { + /* Generate random key-value pairs */ + RandomDataGenerator randGen = dag.addOperator("randomgen", new RandomDataGenerator()); + + /* Initialize with three partition to start with */ + UniqueCounter<KeyValPair<String, Object>> uniqCount = + dag.addOperator("uniqevalue", new UniqueCounter<KeyValPair<String, Object>>()); + MapToKeyHashValuePairConverter<KeyValPair<String, Object>, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter()); + uniqCount.setCumulative(false); + dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<KeyValPair<String, Object>>>(3)); + + ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator()); + + dag.addStream("datain", randGen.outPort, uniqCount.data); + dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL); + dag.addStream("consoutput", converter.output, output.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java new file mode 100644 index 0000000..713cfb9 --- /dev/null +++ b/examples/uniquecount/src/main/java/org/apache/apex/examples/uniquecount/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + Example Application for new Paritionable UniqueCount Operator. + */ +package org.apache.apex.examples.uniquecount; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/main/resources/META-INF/properties.xml b/examples/uniquecount/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..8742328 --- /dev/null +++ b/examples/uniquecount/src/main/resources/META-INF/properties.xml @@ -0,0 +1,29 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- + <property> + <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> + </property> + --> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java new file mode 100644 index 0000000..95242ee --- /dev/null +++ b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/ApplicationTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + @Test + public void testApplication() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + new Application().populateDAG(lma.getDAG(), new Configuration(false)); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java new file mode 100644 index 0000000..6492c9d --- /dev/null +++ b/examples/uniquecount/src/test/java/org/apache/apex/examples/uniquecount/UniqueKeyValExampleTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.uniquecount; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class UniqueKeyValExampleTest +{ + @Test + public void testApplication() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + new UniqueKeyValCountExample().populateDAG(lma.getDAG(), new Configuration(false)); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/uniquecount/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/uniquecount/src/test/resources/log4j.properties b/examples/uniquecount/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/uniquecount/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/pom.xml ---------------------------------------------------------------------- diff --git a/examples/wordcount/pom.xml b/examples/wordcount/pom.xml new file mode 100644 index 0000000..9106c7c --- /dev/null +++ b/examples/wordcount/pom.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-wordcount</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar Wordcount Example</name> + <description>A very simple application that demonstrates Apex Platformâs streaming window feature.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>6.6.4</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/assemble/appPackage.xml b/examples/wordcount/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/wordcount/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java new file mode 100644 index 0000000..6652b79 --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/Application.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.algo.UniqueCounter; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +/** + * Simple Word Count Example : <br> + * This is application to count total occurrence of each word from file or any + * stream. <br> + * <br> + * + * Functional Description : <br> + * This example declares custom input operator to read data file set by user. <br> + * This input operator can be replaced by any stream input operator. <br> + * <br> + * + * Custom Attribute(s) : None <br> + * <br> + * + * Input Adapter : <br> + * Word input operator opens user specified data file and streams each line to + * application. <br> + * <br> + * + * Output Adapter : <br> + * Output values are written to console through ConsoleOutputOerator<br> + * If needed you can use other output adapters<br> + * <br> + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see following output on console: <br> + * </pre> + * {developed=1} {bushâs=2} {roster=1} {council=1} {mankiw=1} {academia=1} + * {of=6} {help=1} {are=1} {presidential=1} + * </pre> <br> <br> + * + * Scaling Options : <br> + * This operator app can not be scaled, please look at implementation {@link com.datatorrent.lib.algo.UniqueCounterEach} <br> <br> + * + * Application DAG : <br> + * <img src="doc-files/UniqueWordCounter.jpg" width=600px > <br> + * + * Streaming Window Size : 500ms + * Operator Details : <br> + * <ul> + * <li> + * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application. + * This can replaced by any input stream by user. <br> + * Class : {@link WordCountInputOperator} <br> + * Operator Application Window Count : 1 <br> + * StateFull : No + * </li> + * <li> + * <p><b> The operator count : </b> This operator aggregates unique key count over one window count(app). <br> + * Class : {@link com.datatorrent.lib.algo.UniqueCounterEach} <br> + * Operator Application Window Count : 1 <br> + * StateFull : No + * </li> + * <li> + * <p><b>The operator Console: </b> This operator just outputs the input tuples to the console (or stdout). + * This case it emits unique count of each word over 500ms. + * </li> + * </ul> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "WordCountExample") +public class Application implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + WordCountInputOperator input = dag.addOperator("wordinput", new WordCountInputOperator()); + UniqueCounter<String> wordCount = dag.addOperator("count", new UniqueCounter<String>()); + dag.addStream("wordinput-count", input.outputPort, wordCount.data); + ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator()); + dag.addStream("count-console",wordCount.count, consoleOperator.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java new file mode 100644 index 0000000..699469b --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/ApplicationWithQuerySupport.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; +import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; + +/** + * Simple example that computes word frequencies from any file dropped into a + * monitored directory. It outputs the top N word-frequency pairs for each file + * as well globally across all files. + * <p> + * Each input file generates a corresponding output file in the output directory + * containing the top N pairs for that file. The output is also written + * to an internal store to support visualization in the UI via queries. + * <p> + * @since 3.2.0 + */ +@ApplicationAnnotation(name = "TopNWordsWithQueries") +public class ApplicationWithQuerySupport implements StreamingApplication +{ + private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class); + + /** + * Name of schema file. + */ + public static final String SNAPSHOT_SCHEMA = "WordDataSchema.json"; + + /** + * Populates the DAG with operators and connecting streams + * + * @param dag The directed acyclic graph of operators to populate + * @param conf The configuration + */ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // create operators + LineReader lineReader = dag.addOperator("lineReader", new LineReader()); + WordReader wordReader = dag.addOperator("wordReader", new WordReader()); + WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount()); + FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount()); + WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter()); + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + console.setStringFormat("wordCount: %s"); + + // create streams + + dag.addStream("lines", lineReader.output, wordReader.input); + dag.addStream("control", lineReader.control, fileWordCount.control); + dag.addStream("words", wordReader.output, windowWordCount.input); + dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input); + dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input); + + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + + if (!StringUtils.isEmpty(gatewayAddress)) { // add query support + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + + AppDataSnapshotServerMap snapshotServerFile + = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap()); + AppDataSnapshotServerMap snapshotServerGlobal + = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap()); + + String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); + snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON); + snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON); + + PubSubWebSocketAppDataQuery wsQueryFile = new PubSubWebSocketAppDataQuery(); + PubSubWebSocketAppDataQuery wsQueryGlobal = new PubSubWebSocketAppDataQuery(); + wsQueryFile.setUri(uri); + wsQueryGlobal.setUri(uri); + + snapshotServerFile.setEmbeddableQueryInfoProvider(wsQueryFile); + snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal); + + PubSubWebSocketAppDataResult wsResultFile + = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult()); + PubSubWebSocketAppDataResult wsResultGlobal + = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult()); + wsResultFile.setUri(uri); + wsResultGlobal.setUri(uri); + + Operator.InputPort<String> queryResultFilePort = wsResultFile.input; + Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input; + + dag.addStream("WordCountsFile", fileWordCount.outputPerFile, snapshotServerFile.input, console.input); + dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, snapshotServerGlobal.input); + + dag.addStream("ResultFile", snapshotServerFile.queryResult, queryResultFilePort); + dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, queryResultGlobalPort); + } else { + //throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS"); + dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input); + } + + LOG.info("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled()); + LOG.info("Returning from populateDAG"); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java ---------------------------------------------------------------------- diff --git a/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java new file mode 100644 index 0000000..a30d0b0 --- /dev/null +++ b/examples/wordcount/src/main/java/org/apache/apex/examples/wordcount/FileWordCount.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.wordcount; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * Computes word frequencies per file and globally, and writes the top N pairs to an output file + * and to snapshot servers for visualization. + * Currently designed to work with only 1 file at a time; will be enhanced later to support + * multiple files dropped into the monitored directory at the same time. + * + * <p> + * Receives per-window list of pairs (word, frequency) on the input port. When the end of a file + * is reached, expects to get an EOF on the control port; at the next endWindow, the top N words + * and frequencies are computed and emitted to the output ports. + * <p> + * There are 3 output ports: (a) One for the per-file top N counts emitted when the file is fully + * read and is written to the output file. (b) One for the top N counts emitted per window for the + * current file to the snapshot server and (c) One for the global top N counts emitted per window + * to a different snapshot server. + * + * Since the EOF is received by a single operator, this operator cannot be partitionable + * + * @since 3.2.0 + */ +public class FileWordCount extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(FileWordCount.class); + private static final String GLOBAL = "global"; + + /** + * If {@literal topN > 0}, only data for the topN most frequent words is output; if topN == 0, the + * entire frequency map is output + */ + protected int topN; + + /** + * Set to true when an EOF control tuple for the current input file is received; reset to false + * when the corresponding output file has been written. + */ + protected boolean eof = false; + + /** + * Last component of path (just the file name) + * incoming value from control tuple + */ + protected String fileName; + + /** + * {@literal (word => frequency)} map: current file, all words + */ + protected Map<String, WCPair> wordMapFile = new HashMap<>(); + + /** + * {@literal (word => frequency)} map: global, all words + */ + protected Map<String, WCPair> wordMapGlobal = new HashMap<>(); + + /** + * Singleton list with per file data; sent on {@code outputPerFile} + */ + protected transient List<Map<String, Object>> resultPerFile; + + /** + * Singleton list with global data; sent on {@code outputGlobal} + */ + protected transient List<Map<String, Object>> resultGlobal; + + /** + * Singleton map of {@code fileName} to sorted list of (word, frequency) pairs + */ + protected transient Map<String, Object> resultFileFinal; + + /** + * final list of (word, frequency) pairs written to output file + */ + protected transient List<WCPair> fileFinalList; + + /** + * Input port on which per-window {@literal (word => frequency)} map is received; the map + * is merged into {@code wordMapFile} and {@code wordMapGlobal}. + */ + public final transient DefaultInputPort<List<WCPair>> input = new DefaultInputPort<List<WCPair>>() + { + @Override + public void process(List<WCPair> list) + { + // blend incoming list into wordMapFile and wordMapGlobal + for (WCPair pair : list) { + final String word = pair.word; + WCPair filePair = wordMapFile.get(word); + if (null != filePair) { // word seen previously in current file + WCPair globalPair = wordMapGlobal.get(word); // cannot be null + filePair.freq += pair.freq; + globalPair.freq += pair.freq; + continue; + } + + // new word in current file + filePair = new WCPair(word, pair.freq); + wordMapFile.put(word, filePair); + + // check global map + WCPair globalPair = wordMapGlobal.get(word); // may be null + if (null != globalPair) { // word seen previously + globalPair.freq += pair.freq; + continue; + } + + // word never seen before + globalPair = new WCPair(word, pair.freq); + wordMapGlobal.put(word, globalPair); + } + } + }; + + /** + * Control port on which the current file name is received to indicate EOF + */ + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<String> control = new DefaultInputPort<String>() + { + @Override + public void process(String msg) + { + if (msg.isEmpty()) { // sanity check + throw new RuntimeException("Empty file path"); + } + LOG.info("FileWordCount: EOF for {}, topN = {}", msg, topN); + fileName = msg; + eof = true; + // NOTE: current version only supports processing one file at a time. + } + }; + + /** + * Output port for current file output + */ + public final transient DefaultOutputPort<List<Map<String, Object>>> + outputPerFile = new DefaultOutputPort<>(); + + /** + * Output port for global output + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<List<Map<String, Object>>> + outputGlobal = new DefaultOutputPort<>(); + + /** + * Tuple is singleton map {@code fileName => TopNMap} where {@code TopNMap} is the final + * top N pairs for current file and will be written to the output file; emitted in the + * {@code endWindow()} call after an EOF + */ + public final transient DefaultOutputPort<Map<String, Object>> + fileOutput = new DefaultOutputPort<>(); + + /** + * Get the number of top (word, frequency) pairs that will be output + */ + public int getTopN() + { + return topN; + } + + /** + * Set the number of top (word, frequency) pairs that will be output + * @param n The new number + */ + public void setTopN(int n) + { + topN = n; + } + + /** + * {@inheritDoc} + * Initialize various map and list fields + */ + @Override + public void setup(OperatorContext context) + { + if (null == wordMapFile) { + wordMapFile = new HashMap<>(); + } + if (null == wordMapGlobal) { + wordMapGlobal = new HashMap<>(); + } + resultPerFile = new ArrayList(1); + resultGlobal = new ArrayList(1); + // singleton map {<fileName> => fileFinalList}; cannot populate it yet since we need fileName + resultFileFinal = new HashMap<>(1); + fileFinalList = new ArrayList<>(); + } + + /** + * {@inheritDoc} + * This is where we do most of the work: + * 1. Sort global map and emit top N pairs + * 2. Sort current file map and emit top N pairs + * 3. If we've seen EOF, emit top N pairs on port connected to file writer and clear all per-file + * data structures. + */ + @Override + public void endWindow() + { + LOG.info("FileWordCount: endWindow for {}, topN = {}", fileName, topN); + + if (wordMapFile.isEmpty()) { // no words found + if (eof) { // write empty list to fileOutput port + // got EOF, so output empty list to output file + fileFinalList.clear(); + resultFileFinal.put(fileName, fileFinalList); + fileOutput.emit(resultFileFinal); + + // reset for next file + eof = false; + fileName = null; + resultFileFinal.clear(); + } + LOG.info("FileWordCount: endWindow for {}, no words, topN = {}", fileName, topN); + return; + } + + LOG.info("FileWordCount: endWindow for {}, wordMapFile.size = {}, topN = {}", fileName, wordMapFile.size(), topN); + + // get topN list for this file and, if we have EOF, emit to fileOutput port + + // get topN global list and emit to global output port + getTopNMap(wordMapGlobal, resultGlobal); + LOG.info("FileWordCount: resultGlobal.size = {}", resultGlobal.size()); + outputGlobal.emit(resultGlobal); + + // get topN list for this file and emit to file output port + getTopNMap(wordMapFile, resultPerFile); + LOG.info("FileWordCount: resultPerFile.size = {}", resultPerFile.size()); + outputPerFile.emit(resultPerFile); + + if (eof) { // got EOF earlier + if (null == fileName) { // need file name to emit topN pairs to file writer + throw new RuntimeException("EOF but no fileName at endWindow"); + } + + // so compute final topN list from wordMapFile into fileFinalList and emit it + getTopNList(wordMapFile); + resultFileFinal.put(fileName, fileFinalList); + fileOutput.emit(resultFileFinal); + + // reset for next file + eof = false; + fileName = null; + wordMapFile.clear(); + resultFileFinal.clear(); + } + } + + /** + * Get topN frequencies from map, convert each pair to a singleton map and append to result + * This map is suitable input to AppDataSnapshotServer + * MUST have {@code map.size() > 0} here + */ + private void getTopNMap(final Map<String, WCPair> map, List<Map<String, Object>> result) + { + final ArrayList<WCPair> list = new ArrayList<>(map.values()); + + // sort entries in descending order of frequency + Collections.sort(list, new Comparator<WCPair>() + { + @Override + public int compare(WCPair o1, WCPair o2) + { + return (int)(o2.freq - o1.freq); + } + }); + + if (topN > 0) { + list.subList(topN, map.size()).clear(); // retain only the first topN entries + } + + // convert each pair (word, freq) of list to a map with 2 elements + // {("word": <word>, "count": freq)} and append to list + // + result.clear(); + for (WCPair pair : list) { + Map<String, Object> wmap = new HashMap<>(2); + wmap.put("word", pair.word); + wmap.put("count", pair.freq); + result.add(wmap); + } + LOG.info("FileWordCount:getTopNMap: result.size = {}", result.size()); + list.clear(); + } + + /** + * Populate fileFinalList with topN frequencies from argument + * This list is suitable input to WordCountWriter which writes it to a file + * MUST have {@code map.size() > 0} here + */ + private void getTopNList(final Map<String, WCPair> map) + { + fileFinalList.clear(); + fileFinalList.addAll(map.values()); + + // sort entries in descending order of frequency + Collections.sort(fileFinalList, new Comparator<WCPair>() + { + @Override + public int compare(WCPair o1, WCPair o2) + { + return (int)(o2.freq - o1.freq); + } + }); + + if (topN > 0) { + fileFinalList.subList(topN, map.size()).clear(); // retain only the first topN entries + } + LOG.info("FileWordCount:getTopNList: fileFinalList.size = {}", fileFinalList.size()); + } +}
