http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java ---------------------------------------------------------------------- diff --git a/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java new file mode 100644 index 0000000..7bbb8ec --- /dev/null +++ b/examples/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sql.sample; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.TimeZone; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Predicates; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; + +import com.datatorrent.api.LocalMode; + +public class SQLApplicationWithModelFileTest +{ + private TimeZone defaultTZ; + + @Before + public void setUp() throws Exception + { + defaultTZ = TimeZone.getDefault(); + TimeZone.setDefault(TimeZone.getTimeZone("GMT")); + } + + @After + public void tearDown() throws Exception + { + TimeZone.setDefault(defaultTZ); + } + + @Test + public void test() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml")); + + SQLApplicationWithModelFile app = new SQLApplicationWithModelFile(); + + lma.prepareDAG(app, conf); + + LocalMode.Controller lc = lma.getController(); + + PrintStream originalSysout = System.out; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + + lc.runAsync(); + waitTillStdoutIsPopulated(baos, 30000); + lc.shutdown(); + + System.setOut(originalSysout); + + String[] sout = baos.toString().split(System.lineSeparator()); + Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + + String[] actualLines = filter.toArray(new String[filter.size()]); + Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1")); + Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2")); + Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3")); + Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4")); + Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5")); + Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6")); + } + + public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException, + IOException + { + long now = System.currentTimeMillis(); + Collection<String> filter = Lists.newArrayList(); + while (System.currentTimeMillis() - now < timeout) { + baos.flush(); + String[] sout = baos.toString().split(System.lineSeparator()); + filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); + if (filter.size() != 0) { + break; + } + + Thread.sleep(500); + } + + return (filter.size() != 0); + } + +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/resources/input.csv ---------------------------------------------------------------------- diff --git a/examples/sql/src/test/resources/input.csv b/examples/sql/src/test/resources/input.csv new file mode 100644 index 0000000..c4786d1 --- /dev/null +++ b/examples/sql/src/test/resources/input.csv @@ -0,0 +1,6 @@ +15/02/2016 10:15:00 +0000,1,paint1,11 +15/02/2016 10:16:00 +0000,2,paint2,12 +15/02/2016 10:17:00 +0000,3,paint3,13 +15/02/2016 10:18:00 +0000,4,paint4,14 +15/02/2016 10:19:00 +0000,5,paint5,15 +15/02/2016 10:10:00 +0000,6,abcde6,16 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/sql/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/sql/src/test/resources/log4j.properties b/examples/sql/src/test/resources/log4j.properties new file mode 100644 index 0000000..8ea3cfe --- /dev/null +++ b/examples/sql/src/test/resources/log4j.properties @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=WARN +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=INFO +log4j.logger.org.apache.apex=INFO + +log4j.logger.org.apache.calcite=WARN +log4j.logger.org.apache.kafka=WARN +log4j.logger.org.I0Itec.zkclient.ZkClient=WARN +log4j.logger.org.apache.zookeeper=WARN +log4j.logger.kafka=WARN +log4j.logger.kafka.consumer=WARN http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml new file mode 100644 index 0000000..76924c9 --- /dev/null +++ b/examples/twitter/pom.xml @@ -0,0 +1,101 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-twitter</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar Twitter Example</name> + <description>Twitter Rolling Top Words application demonstrates real-time computations over a sliding window.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <properties> + <skipTests>true</skipTests> + </properties> + + <dependencies> + <dependency> + <!-- required by twitter example --> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-core</artifactId> + <version>4.0.6</version> + </dependency> + <dependency> + <!-- required by twitter example --> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>4.0.6</version> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase</artifactId> + <version>0.94.20</version> + <type>jar</type> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>5.1.22</version> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-kinesis</artifactId> + <version>1.9.10</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.4.4</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>6.6.4</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/twitter/src/assemble/appPackage.xml b/examples/twitter/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/twitter/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java new file mode 100644 index 0000000..be7edfb --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/KinesisHashtagsApplication.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.net.URI; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator.InputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator; +import com.datatorrent.contrib.kinesis.KinesisStringInputOperator; +import com.datatorrent.contrib.kinesis.KinesisStringOutputOperator; +import com.datatorrent.contrib.kinesis.ShardManager; +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.algo.UniqueCounter; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; + +/** + * Twitter Example Application: <br> + * This example application samples random public status from twitter, send to Hashtag + * extractor and extract the status and send it into kinesis <br> + * Get the records from kinesis and converts into Hashtags. Top 10 Hashtag(s) mentioned in + * tweets in last 5 mins are displayed on every window count (500ms).<br> + * <br> + * + * Real Time Calculation :<br> + * This application calculates top 10 Hashtag mentioned in tweets in last 5 + * minutes across a 1% random tweet sampling on a rolling window basis.<br> + * <br> + * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>, + * <a href="https://http://aws.amazon.com/">AWS Account</a> and configure the authentication details. + * For launch from CLI, those go into ~/.dt/dt-site.xml: + * <pre> + * {@code + * <?xml version="1.0" encoding="UTF-8"?> + * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + * <configuration> + * + * <property> <name>dt.operator.TweetSampler.prop.consumerKey</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.prop.accessToken</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.FromKinesis.streamName</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.FromKinesis.accessKey</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.FromKinesis.secretKey</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.ToKinesis.streamName</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.ToKinesis.accessKey</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.ToKinesis.secretKey</name> + * <value>TBD</value> </property> + * + * </configuration> + * } + * </pre> + * Custom Attributes: <br> + * <b>topCounts operator : <b> + * <ul> + * <li>Top Count : 10, number of top unique Hashtag to be reported.</li> + * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li> + * <li>window slide value : 1</li> + * </ul> + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see similar output on console as below: + * + * <pre> + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished. + * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request + * </pre> + * + * Scaling Options : <br> + * User can scale application by setting intial partition size > 1 on count + * unique operator. <br> + * <br> + * + * Application DAG : <br> + * <img src="doc-files/Application.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 500ms(default) <br> + * Operator Details : <br> + * <ul> + * <li><b>The twitterFeed operator : </b> This operator samples random public + * statues from twitter and emits to application. <br> + * Class : org.apache.apex.examples.twitter.TwitterSampleInput <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from + * random sampled statues from twitter. <br> + * Class : {@link TwitterStatusHashtagExtractor} <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b>The outputOp operator : </b> This operator sent the tags into the kinesis. <br> + * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br> + * </li> + * <li><b>The inputOp operator : </b> This operator fetches the records from kinesis and + * converts into hastags and emits them. <br> + * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br> + * </li> + * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each + * Hashtag extracted from random samples. <br> + * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1 + * min sliding window count 1. <br> + * Class : com.datatorrent.lib.algo.WindowedTopCounter <br> + * StateFull : Yes, sliding window count 120 (1 min) <br> + * </li> + * <li><b>The operator Console: </b> This operator just outputs the input tuples + * to the console (or stdout). <br> + * </li> + * </ul> + * + * @since 2.0.0 + */ +@ApplicationAnnotation(name = "TwitterKinesisExample") +public class KinesisHashtagsApplication implements StreamingApplication +{ + private final Locality locality = null; + + private InputPort<Object> consoleOutput(DAG dag, String operatorName) + { + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + if (!StringUtils.isEmpty(gatewayAddress)) { + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + String topic = "examples.twitter." + operatorName; + //LOG.info("WebSocket with gateway at: {}", gatewayAddress); + PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>()); + wsOut.setUri(uri); + wsOut.setTopic(topic); + return wsOut.input; + } + ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator()); + operator.setStringFormat(operatorName + ": %s"); + return operator.input; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Setup the operator to get the data from twitter sample stream injected into the system. + TwitterSampleInput twitterFeed = new TwitterSampleInput(); + twitterFeed = dag.addOperator("TweetSampler", twitterFeed); + + // Setup the operator to get the Hashtags extracted from the twitter statuses + TwitterStatusHashtagExtractor HashtagExtractor = dag.addOperator("HashtagExtractor", TwitterStatusHashtagExtractor.class); + + //Setup the operator send the twitter statuses to kinesis + KinesisStringOutputOperator outputOp = dag.addOperator("ToKinesis", new KinesisStringOutputOperator()); + outputOp.setBatchSize(500); + + // Feed the statuses from feed into the input of the Hashtag extractor. + dag.addStream("TweetStream", twitterFeed.status, HashtagExtractor.input).setLocality(Locality.CONTAINER_LOCAL); + // Start counting the Hashtags coming out of Hashtag extractor + dag.addStream("SendToKinesis", HashtagExtractor.hashtags, outputOp.inputPort).setLocality(locality); + + //------------------------------------------------------------------------------------------ + + KinesisStringInputOperator inputOp = dag.addOperator("FromKinesis", new KinesisStringInputOperator()); + ShardManager shardStats = new ShardManager(); + inputOp.setShardManager(shardStats); + inputOp.getConsumer().setRecordsLimit(600); + inputOp.setStrategy(AbstractKinesisInputOperator.PartitionStrategy.MANY_TO_ONE.toString()); + + // Setup a node to count the unique Hashtags within a window. + UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>()); + + // Get the aggregated Hashtag counts and count them over last 5 mins. + WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); + topCounts.setTopCount(10); + topCounts.setSlidingWindowWidth(600); + topCounts.setDagWindowWidth(1); + + dag.addStream("TwittedHashtags", inputOp.outputPort, uniqueCounter.data).setLocality(locality); + + // Count unique Hashtags + dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input).setLocality(locality); + // Count top 10 + dag.addStream("TopHashtags", topCounts.output, consoleOutput(dag, "topHashtags")).setLocality(locality); + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java new file mode 100644 index 0000000..dfcac81 --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/SlidingContainer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.io.Serializable; + +/** + * Developed for a example<br> + * + * @param <T> Type of object for which sliding window is being maintained. + * @since 0.3.2 + */ +public class SlidingContainer<T> implements Serializable +{ + private static final long serialVersionUID = 201305291751L; + T identifier; + int totalCount; + int position; + int[] windowedCount; + + @SuppressWarnings("unused") + private SlidingContainer() + { + /* needed for Kryo serialization */ + } + + public SlidingContainer(T identifier, int windowCount) + { + this.identifier = identifier; + this.totalCount = 0; + this.position = 0; + windowedCount = new int[windowCount]; + } + + public void adjustCount(int i) + { + windowedCount[position] += i; + } + + public void slide() + { + int currentCount = windowedCount[position]; + position = position == windowedCount.length - 1 ? 0 : position + 1; + totalCount += currentCount - windowedCount[position]; + windowedCount[position] = 0; + } + + @Override + public String toString() + { + return identifier + " => " + totalCount; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java new file mode 100644 index 0000000..62ec6cf --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpApplication.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import javax.annotation.Nonnull; + +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; + +import twitter4j.Status; + +/** + * An application which connects to Twitter Sample Input and stores all the + * tweets with their usernames in a mysql database. Please review the docs + * for TwitterTopCounterApplication to setup your twitter credentials. You + * may also be able to change JDBCStore credentials using config file. + * + * You will also need to create appropriate database and tables with the + * following schema, also included in mysql.sql in resources: + * <pre> + * DROP TABLE if exists tweets; + * CREATE TABLE tweets ( + * window_id LONG NOT NULL, + * creation_date DATE, + * text VARCHAR(256) NOT NULL, + * userid VARCHAR(40) NOT NULL, + * KEY ( userid, creation_date) + * ); + * + * drop table if exists dt_window_id_tracker; + * CREATE TABLE dt_window_id_tracker ( + * dt_application_id VARCHAR(100) NOT NULL, + * dt_operator_id int(11) NOT NULL, + * dt_window_id bigint NOT NULL, + * UNIQUE (dt_application_id, dt_operator_id, dt_window_id) + * ) ENGINE=MyISAM DEFAULT CHARSET=latin1; + * </pre> + * + * @since 0.9.4 + */ +@ApplicationAnnotation(name = "TwitterDumpExample") +public class TwitterDumpApplication implements StreamingApplication +{ + public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status> + { + public static final String INSERT_STATUS_STATEMENT = "insert into tweets (window_id, creation_date, text, userid) values (?, ?, ?, ?)"; + + public Status2Database() + { + store.setMetaTable("dt_window_id_tracker"); + store.setMetaTableAppIdColumn("dt_application_id"); + store.setMetaTableOperatorIdColumn("dt_operator_id"); + store.setMetaTableWindowColumn("dt_window_id"); + } + + @Nonnull + @Override + protected String getUpdateCommand() + { + return INSERT_STATUS_STATEMENT; + } + + @Override + protected void setStatementParameters(PreparedStatement statement, Status tuple) throws SQLException + { + statement.setLong(1, currentWindowId); + + statement.setDate(2, new java.sql.Date(tuple.getCreatedAt().getTime())); + statement.setString(3, tuple.getText()); + statement.setString(4, tuple.getUser().getScreenName()); + statement.addBatch(); + } + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump"); + + TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput()); + + //ConsoleOutputOperator dbWriter = dag.addOperator("DatabaseWriter", new ConsoleOutputOperator()); + + Status2Database dbWriter = dag.addOperator("DatabaseWriter", new Status2Database()); + dbWriter.getStore().setDatabaseDriver("com.mysql.jdbc.Driver"); + dbWriter.getStore().setDatabaseUrl("jdbc:mysql://node6.morado.com:3306/twitter"); + dbWriter.getStore().setConnectionProperties("user:twitter"); + + dag.addStream("Statuses", twitterStream.status, dbWriter.input).setLocality(Locality.CONTAINER_LOCAL); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java new file mode 100644 index 0000000..3169132 --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterDumpHBaseApplication.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Put; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator; +import com.datatorrent.contrib.twitter.TwitterSampleInput; + +import twitter4j.Status; + +/** + * An application which connects to Twitter Sample Input and stores all the + * tweets with their usernames in a hbase database. Please review the docs + * for TwitterTopCounterApplication to setup your twitter credentials. + * + * You need to create the HBase table to run this example. Table name can be + * configured but columnfamily must be 'cf' to make this example simple and complied + * with the mysql based example. + * create 'tablename', 'cf' + * + * </pre> + * + * @since 1.0.2 + */ +@ApplicationAnnotation(name = "TwitterDumpHBaseExample") +public class TwitterDumpHBaseApplication implements StreamingApplication +{ + + public static class Status2Hbase extends AbstractHBasePutOutputOperator<Status> + { + + @Override + public Put operationPut(Status t) + { + Put put = new Put(ByteBuffer.allocate(8).putLong(t.getCreatedAt().getTime()).array()); + put.add("cf".getBytes(), "text".getBytes(), t.getText().getBytes()); + put.add("cf".getBytes(), "userid".getBytes(), t.getText().getBytes()); + return put; + } + + } + + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump"); + + TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput()); + + Status2Hbase hBaseWriter = dag.addOperator("DatabaseWriter", new Status2Hbase()); + + dag.addStream("Statuses", twitterStream.status, hBaseWriter.input).setLocality(Locality.CONTAINER_LOCAL); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java new file mode 100644 index 0000000..7b468ec --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusHashtagExtractor.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import twitter4j.HashtagEntity; +import twitter4j.Status; + +/** + * <p>TwitterStatusHashtagExtractor class.</p> + * + * @since 1.0.2 + */ +public class TwitterStatusHashtagExtractor extends BaseOperator +{ + public final transient DefaultOutputPort<String> hashtags = new DefaultOutputPort<String>(); + public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>() + { + @Override + public void process(Status status) + { + HashtagEntity[] entities = status.getHashtagEntities(); + if (entities != null) { + for (HashtagEntity he : entities) { + if (he != null) { + hashtags.emit(he.getText()); + } + } + } + } + + }; +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java new file mode 100644 index 0000000..40a7d3d --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusURLExtractor.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import twitter4j.Status; +import twitter4j.URLEntity; + +/** + * <p>TwitterStatusURLExtractor class.</p> + * + * @since 0.3.2 + */ +public class TwitterStatusURLExtractor extends BaseOperator +{ + public final transient DefaultOutputPort<String> url = new DefaultOutputPort<String>(); + public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>() + { + @Override + public void process(Status status) + { + URLEntity[] entities = status.getURLEntities(); + if (entities != null) { + for (URLEntity ue: entities) { + if (ue != null) { // see why we intermittently get NPEs + url.emit((ue.getExpandedURL() == null ? ue.getURL() : ue.getExpandedURL()).toString()); + } + } + } + } + }; + + private static final Logger LOG = LoggerFactory.getLogger(TwitterStatusURLExtractor.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java new file mode 100644 index 0000000..6581b76 --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterStatusWordExtractor.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.util.Arrays; +import java.util.HashSet; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * <p>TwitterStatusWordExtractor class.</p> + * + * @since 0.3.2 + */ +public class TwitterStatusWordExtractor extends BaseOperator +{ + public HashSet<String> filterList; + + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + @Override + public void process(String text) + { + String[] strs = text.split(" "); + if (strs != null) { + for (String str : strs) { + if (str != null && !filterList.contains(str) ) { + output.emit(str); + } + } + } + } + }; + + @Override + public void setup(OperatorContext context) + { + this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but", + "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when", + "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"})); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java new file mode 100644 index 0000000..ee43383 --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopCounterApplication.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Operator.OutputPort; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.algo.UniqueCounter; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; +import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; + +/** + * Twitter Example Application: <br> + * This example application samples random public status from twitter, send to url + * extractor. <br> + * Top 10 url(s) mentioned in tweets in last 5 mins are displayed on every + * window count (500ms).<br> + * <br> + * + * Real Time Calculation :<br> + * This application calculates top 10 url mentioned in tweets in last 5 + * minutes across a 1% random tweet sampling on a rolling window basis.<br> + * <br> + * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a> + * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml: + * <pre> + * {@code + * <?xml version="1.0" encoding="UTF-8"?> + * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + * <configuration> + * + * <property> <name>dt.operator.TweetSampler.consumerKey</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.consumerSecret</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.accessToken</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.accessTokenSecret</name> + * <value>TBD</value> </property> + * </configuration> + * } + * </pre> + * Custom Attributes: <br> + * <b>topCounts operator : <b> + * <ul> + * <li>Top Count : 10, number of top unique url to be reported.</li> + * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li> + * <li>window slide value : 1</li> + * </ul> + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see following output on + * console: + * + * <pre> + * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} + * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} + * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} + * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} + * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} + * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} + * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} + * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished. + * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request + * </pre> + * + * Scaling Options : <br> + * User can scale application by setting intial partition size > 1 on count + * unique operator. <br> + * <br> + * + * Application DAG : <br> + * <img src="doc-files/Application.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 500ms(default) <br> + * Operator Details : <br> + * <ul> + * <li><b>The twitterFeed operator : </b> This operator samples random public + * statues from twitter and emits to application. <br> + * Class : com.datatorrent.examples.twitter.TwitterSampleInput <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b>The urlExtractor operator : </b> This operator extracts url from + * random sampled statues from twitter. <br> + * Class : {@link TwitterStatusURLExtractor} <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each + * url extracted from random samples. <br> + * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b> The topCounts operator : </b> This operator caluculates top url in last 1 + * min sliding window count 1. <br> + * Class : com.datatorrent.lib.algo.WindowedTopCounter <br> + * StateFull : Yes, sliding window count 120 (1 min) <br> + * </li> + * <li><b>The operator Console: </b> This operator just outputs the input tuples + * to the console (or stdout). <br> + * </li> + * </ul> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME) +public class TwitterTopCounterApplication implements StreamingApplication +{ + public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json"; + public static final String CONVERSION_SCHEMA = "twitterURLConverterSchema.json"; + public static final String APP_NAME = "TwitterExample"; + + private final Locality locality = null; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Setup the operator to get the data from twitter sample stream injected into the system. + TwitterSampleInput twitterFeed = new TwitterSampleInput(); + twitterFeed = dag.addOperator("TweetSampler", twitterFeed); + + // Setup the operator to get the URLs extracted from the twitter statuses + TwitterStatusURLExtractor urlExtractor = dag.addOperator("URLExtractor", TwitterStatusURLExtractor.class); + + // Setup a node to count the unique urls within a window. + UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueURLCounter", new UniqueCounter<String>()); + // Get the aggregated url counts and count them over last 5 mins. + dag.setAttribute(uniqueCounter, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 600); + dag.setAttribute(uniqueCounter, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 1); + + + WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); + topCounts.setTopCount(10); + topCounts.setSlidingWindowWidth(1); + topCounts.setDagWindowWidth(1); + + // Feed the statuses from feed into the input of the url extractor. + dag.addStream("TweetStream", twitterFeed.status, urlExtractor.input).setLocality(Locality.CONTAINER_LOCAL); + // Start counting the urls coming out of URL extractor + dag.addStream("TwittedURLs", urlExtractor.url, uniqueCounter.data).setLocality(locality); + // Count unique urls + dag.addStream("UniqueURLCounts", uniqueCounter.count, topCounts.input); + + consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url"); + } + + public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias) + { + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + if (!StringUtils.isEmpty(gatewayAddress)) { + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + + AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap()); + + Map<String, String> conversionMap = Maps.newHashMap(); + conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE); + String snapshotServerJSON = SchemaUtils.jarResourceFileToString(schemaFile); + + snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); + snapshotServer.setTableFieldToMapField(conversionMap); + + PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery(); + wsQuery.setUri(uri); + snapshotServer.setEmbeddableQueryInfoProvider(wsQuery); + + PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); + wsResult.setUri(uri); + Operator.InputPort<String> queryResultPort = wsResult.input; + + dag.addStream("MapProvider", topCount, snapshotServer.input); + dag.addStream("Result", snapshotServer.queryResult, queryResultPort); + } else { + ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator()); + operator.setStringFormat(operatorName + ": %s"); + + dag.addStream("MapProvider", topCount, operator.input); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java new file mode 100644 index 0000000..fb274ea --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTopWordsApplication.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.algo.UniqueCounter; + +/** + * This application is same as other twitter example + * {@link TwitterTopCounterApplication} <br> + * Run Sample : + * + * <pre> + * 2013-06-17 16:50:34,911 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl info - Connection established. + * 2013-06-17 16:50:34,912 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl info - Receiving status stream. + * topWords: {} + * topWords: {love=1, ate=1, catch=1, calma=1, Phillies=1, ela=1, from=1, running=1} + * </pre> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME) +public class TwitterTopWordsApplication implements StreamingApplication +{ + public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json"; + public static final String CONVERSION_SCHEMA = "twitterWordConverterSchema.json"; + public static final String APP_NAME = "RollingTopWordsExample"; + public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + TwitterSampleInput twitterFeed = new TwitterSampleInput(); + twitterFeed = dag.addOperator("TweetSampler", twitterFeed); + + TwitterStatusWordExtractor wordExtractor = dag.addOperator("WordExtractor", TwitterStatusWordExtractor.class); + UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueWordCounter", new UniqueCounter<String>()); + WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); + + topCounts.setSlidingWindowWidth(120); + topCounts.setDagWindowWidth(1); + + dag.addStream("TweetStream", twitterFeed.text, wordExtractor.input); + dag.addStream("TwittedWords", wordExtractor.output, uniqueCounter.data); + dag.addStream("UniqueWordCounts", uniqueCounter.count, topCounts.input).setLocality(Locality.CONTAINER_LOCAL); + + TwitterTopCounterApplication.consoleOutput(dag, "topWords", topCounts.output, SNAPSHOT_SCHEMA, "word"); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java new file mode 100644 index 0000000..7a03a64 --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/TwitterTrendingHashtagsApplication.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.twitter.TwitterSampleInput; +import com.datatorrent.lib.algo.UniqueCounter; + +/** + * Twitter Example Application: <br> + * This example application samples random public status from twitter, send to Hashtag + * extractor. <br> + * Top 10 Hashtag(s) mentioned in tweets in last 5 mins are displayed on every + * window count (500ms).<br> + * <br> + * + * Real Time Calculation :<br> + * This application calculates top 10 Hashtag mentioned in tweets in last 5 + * minutes across a 1% random tweet sampling on a rolling window basis.<br> + * <br> + * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a> + * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml: + * <pre> + * {@code + * <?xml version="1.0" encoding="UTF-8"?> + * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + * <configuration> + * + * <property> <name>dt.operator.TweetSampler.prop.consumerKey</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.prop.accessToken</name> + * <value>TBD</value> </property> + * + * <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name> + * <value>TBD</value> </property> + * </configuration> + * } + * </pre> + * Custom Attributes: <br> + * <b>topCounts operator : <b> + * <ul> + * <li>Top Count : 10, number of top unique Hashtag to be reported.</li> + * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li> + * <li>window slide value : 1</li> + * </ul> + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see similar output on console as below: + * + * <pre> + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} + * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished. + * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request + * </pre> + * + * Scaling Options : <br> + * User can scale application by setting intial partition size > 1 on count + * unique operator. <br> + * <br> + * + * Application DAG : <br> + * <img src="doc-files/Application.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 500ms(default) <br> + * Operator Details : <br> + * <ul> + * <li><b>The twitterFeed operator : </b> This operator samples random public + * statues from twitter and emits to application. <br> + * Class : com.datatorrent.examples.twitter.TwitterSampleInput <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from + * random sampled statues from twitter. <br> + * Class : {@link TwitterStatusHashtagExtractor} <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each + * Hashtag extracted from random samples. <br> + * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br> + * StateFull : No, window count 1 <br> + * </li> + * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1 + * min sliding window count 1. <br> + * Class : com.datatorrent.lib.algo.WindowedTopCounter <br> + * StateFull : Yes, sliding window count 120 (1 min) <br> + * </li> + * <li><b>The operator Console: </b> This operator just outputs the input tuples + * to the console (or stdout). <br> + * </li> + * </ul> + * + * @since 1.0.2 + */ +@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME) +public class TwitterTrendingHashtagsApplication implements StreamingApplication +{ + public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json"; + public static final String CONVERSION_SCHEMA = "twitterHashTagConverterSchema.json"; + public static final String APP_NAME = "TwitterTrendingExample"; + public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData"; + + private final Locality locality = null; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Setup the operator to get the data from twitter sample stream injected into the system. + TwitterSampleInput twitterFeed = new TwitterSampleInput(); + twitterFeed = dag.addOperator("TweetSampler", twitterFeed); + + // Setup a node to count the unique Hashtags within a window. + UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>()); + + // Get the aggregated Hashtag counts and count them over last 5 mins. + WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); + topCounts.setTopCount(10); + topCounts.setSlidingWindowWidth(600); + topCounts.setDagWindowWidth(1); + + dag.addStream("TwittedHashtags", twitterFeed.hashtag, uniqueCounter.data).setLocality(locality); + // Count unique Hashtags + dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input); + + TwitterTopCounterApplication.consoleOutput(dag, "topHashtags", topCounts.output, SNAPSHOT_SCHEMA, "hashtag"); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java new file mode 100644 index 0000000..0c3f481 --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/URLSerDe.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.nio.ByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.netlet.util.Slice; + +/** + * <p>URLSerDe class.</p> + * + * @since 0.3.2 + */ +public class URLSerDe implements StreamCodec<byte[]> +{ + /** + * Covert the bytes into object useful for downstream node. + * + * @param fragment + * @return WindowedURLHolder object which represents the bytes. + */ + @Override + public byte[] fromByteArray(Slice fragment) + { + if (fragment == null || fragment.buffer == null) { + return null; + } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) { + return fragment.buffer; + } else { + byte[] buffer = new byte[fragment.buffer.length]; + System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length); + return buffer; + } + } + + /** + * Cast the input object to byte[]. + * + * @param object - byte array representing the bytes of the string + * @return the same object as input + */ + @Override + public Slice toByteArray(byte[] object) + { + return new Slice(object, 0, object.length); + } + + @Override + public int getPartition(byte[] object) + { + ByteBuffer bb = ByteBuffer.wrap(object); + return bb.hashCode(); + } + + private static final Logger logger = LoggerFactory.getLogger(URLSerDe.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java new file mode 100644 index 0000000..8ecd6ae --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/WindowedTopCounter.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.twitter; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * + * WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size. + * The operator expects to receive a map object which contains a set of objects mapped to their respective frequency of + * occurrences. e.g. if we are looking at most commonly occurring names then the operator expects to receive the tuples + * of type Map<String, Intenger> on its input port, and at the end of the window it emits 1 object of type Map<String, Integer> + * with a pre determined size. The emitted object contains the most frequently occurring keys. + * + * @param <T> Type of the key in the map object which is accepted on input port as payload. Note that this key must be HashMap friendly. + * @since 0.3.2 + */ +public class WindowedTopCounter<T> extends BaseOperator +{ + public static final String FIELD_TYPE = "type"; + public static final String FIELD_COUNT = "count"; + + private static final Logger logger = LoggerFactory.getLogger(WindowedTopCounter.class); + + private PriorityQueue<SlidingContainer<T>> topCounter; + private int windows; + private int topCount = 10; + private int slidingWindowWidth; + private int dagWindowWidth; + private HashMap<T, SlidingContainer<T>> objects = new HashMap<T, SlidingContainer<T>>(); + + /** + * Input port on which map objects containing keys with their respective frequency as values will be accepted. + */ + public final transient DefaultInputPort<Map<T, Integer>> input = new DefaultInputPort<Map<T, Integer>>() + { + @Override + public void process(Map<T, Integer> map) + { + for (Map.Entry<T, Integer> e : map.entrySet()) { + SlidingContainer<T> holder = objects.get(e.getKey()); + if (holder == null) { + holder = new SlidingContainer<T>(e.getKey(), windows); + objects.put(e.getKey(), holder); + } + holder.adjustCount(e.getValue()); + } + } + }; + + public final transient DefaultOutputPort<List<Map<String, Object>>> output = new DefaultOutputPort<List<Map<String, Object>>>(); + + @Override + public void setup(OperatorContext context) + { + windows = (int)(slidingWindowWidth / dagWindowWidth) + 1; + if (slidingWindowWidth % dagWindowWidth != 0) { + logger.warn("slidingWindowWidth(" + slidingWindowWidth + ") is not exact multiple of dagWindowWidth(" + dagWindowWidth + ")"); + } + + topCounter = new PriorityQueue<SlidingContainer<T>>(this.topCount, new TopSpotComparator()); + } + + @Override + public void beginWindow(long windowId) + { + topCounter.clear(); + } + + @Override + public void endWindow() + { + Iterator<Map.Entry<T, SlidingContainer<T>>> iterator = objects.entrySet().iterator(); + int i = topCount; + + /* + * Try to fill the priority queue with the first topCount URLs. + */ + SlidingContainer<T> holder; + while (iterator.hasNext()) { + holder = iterator.next().getValue(); + holder.slide(); + + if (holder.totalCount == 0) { + iterator.remove(); + } else { + topCounter.add(holder); + if (--i == 0) { + break; + } + } + } + logger.debug("objects.size(): {}", objects.size()); + + /* + * Make room for the new element in the priority queue by deleting the + * smallest one, if we KNOW that the new element is useful to us. + */ + if (i == 0) { + int smallest = topCounter.peek().totalCount; + while (iterator.hasNext()) { + holder = iterator.next().getValue(); + holder.slide(); + + if (holder.totalCount > smallest) { + topCounter.poll(); + topCounter.add(holder); + smallest = topCounter.peek().totalCount; + } else if (holder.totalCount == 0) { + iterator.remove(); + } + } + } + + List<Map<String, Object>> data = Lists.newArrayList(); + + Iterator<SlidingContainer<T>> topIter = topCounter.iterator(); + + while (topIter.hasNext()) { + final SlidingContainer<T> wh = topIter.next(); + Map<String, Object> tableRow = Maps.newHashMap(); + + tableRow.put(FIELD_TYPE, wh.identifier.toString()); + tableRow.put(FIELD_COUNT, wh.totalCount); + + data.add(tableRow); + } + + Collections.sort(data, TwitterOutputSorter.INSTANCE); + + output.emit(data); + topCounter.clear(); + } + + @Override + public void teardown() + { + topCounter = null; + objects = null; + } + + /** + * Set the count of most frequently occurring keys to emit per map object. + * + * @param count count of the objects in the map emitted at the output port. + */ + public void setTopCount(int count) + { + topCount = count; + } + + public int getTopCount() + { + return topCount; + } + + /** + * @return the windows + */ + public int getWindows() + { + return windows; + } + + /** + * @param windows the windows to set + */ + public void setWindows(int windows) + { + this.windows = windows; + } + + /** + * @return the slidingWindowWidth + */ + public int getSlidingWindowWidth() + { + return slidingWindowWidth; + } + + /** + * Set the width of the sliding window. + * + * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently + * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms, + * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000. + * + * @param slidingWindowWidth - Sliding window width to be set for this operator, recommended to be multiple of DAG window. + */ + public void setSlidingWindowWidth(int slidingWindowWidth) + { + this.slidingWindowWidth = slidingWindowWidth; + } + + /** + * @return the dagWindowWidth + */ + public int getDagWindowWidth() + { + return dagWindowWidth; + } + + /** + * Set the width of the sliding window. + * + * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently + * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms, + * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000. + * + * @param dagWindowWidth - DAG's native window width. It has to be the value of the native window set at the application level. + */ + public void setDagWindowWidth(int dagWindowWidth) + { + this.dagWindowWidth = dagWindowWidth; + } + + static class TopSpotComparator implements Comparator<SlidingContainer<?>> + { + @Override + public int compare(SlidingContainer<?> o1, SlidingContainer<?> o2) + { + if (o1.totalCount > o2.totalCount) { + return 1; + } else if (o1.totalCount < o2.totalCount) { + return -1; + } + + return 0; + } + } + + private static class TwitterOutputSorter implements Comparator<Map<String, Object>> + { + public static final TwitterOutputSorter INSTANCE = new TwitterOutputSorter(); + + private TwitterOutputSorter() + { + } + + @Override + public int compare(Map<String, Object> o1, Map<String, Object> o2) + { + Integer count1 = (Integer)o1.get(FIELD_COUNT); + Integer count2 = (Integer)o2.get(FIELD_COUNT); + + return count1.compareTo(count2); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif new file mode 100644 index 0000000..d21e1d9 Binary files /dev/null and b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/doc-files/Application.gif differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java new file mode 100644 index 0000000..956ff08 --- /dev/null +++ b/examples/twitter/src/main/java/org/apache/apex/examples/twitter/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Twitter top URL's demonstration application. + */ +package org.apache.apex.examples.twitter; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml ---------------------------------------------------------------------- diff --git a/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml b/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml new file mode 100644 index 0000000..47b6531 --- /dev/null +++ b/examples/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml @@ -0,0 +1,52 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> + +<!-- properties for the twitter kinesis example --> +<configuration> + + <property> + <name>dt.operator.FromKinesis.streamName</name> + <value>TwitterTag</value> + </property> + <property> + <name>dt.operator.FromKinesis.accessKey</name> + </property> + <property> + <name>dt.operator.FromKinesis.secretKey</name> + </property> + <property> + <name>dt.operator.FromKinesis.endPoint</name> + </property> + <property> + <name>dt.operator.ToKinesis.streamName</name> + <value>TwitterTag</value> + </property> + <property> + <name>dt.operator.ToKinesis.accessKey</name> + </property> + <property> + <name>dt.operator.ToKinesis.secretKey</name> + </property> + <property> + <name>dt.operator.ToKinesis.endPoint</name> + </property> + +</configuration>
