http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java b/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java deleted file mode 100644 index 7bbb8ec..0000000 --- a/demos/sql/src/test/java/org/apache/apex/malhar/sql/sample/SQLApplicationWithModelFileTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.sql.sample; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.TimeZone; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.google.common.base.Predicates; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; - -import com.datatorrent.api.LocalMode; - -public class SQLApplicationWithModelFileTest -{ - private TimeZone defaultTZ; - - @Before - public void setUp() throws Exception - { - defaultTZ = TimeZone.getDefault(); - TimeZone.setDefault(TimeZone.getTimeZone("GMT")); - } - - @After - public void tearDown() throws Exception - { - TimeZone.setDefault(defaultTZ); - } - - @Test - public void test() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SQLApplicationWithModelFile.xml")); - - SQLApplicationWithModelFile app = new SQLApplicationWithModelFile(); - - lma.prepareDAG(app, conf); - - LocalMode.Controller lc = lma.getController(); - - PrintStream originalSysout = System.out; - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - System.setOut(new PrintStream(baos)); - - lc.runAsync(); - waitTillStdoutIsPopulated(baos, 30000); - lc.shutdown(); - - System.setOut(originalSysout); - - String[] sout = baos.toString().split(System.lineSeparator()); - Collection<String> filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); - - String[] actualLines = filter.toArray(new String[filter.size()]); - Assert.assertTrue(actualLines[0].contains("RowTime=Mon Feb 15 10:15:00 GMT 2016, Product=paint1")); - Assert.assertTrue(actualLines[1].contains("RowTime=Mon Feb 15 10:16:00 GMT 2016, Product=paint2")); - Assert.assertTrue(actualLines[2].contains("RowTime=Mon Feb 15 10:17:00 GMT 2016, Product=paint3")); - Assert.assertTrue(actualLines[3].contains("RowTime=Mon Feb 15 10:18:00 GMT 2016, Product=paint4")); - Assert.assertTrue(actualLines[4].contains("RowTime=Mon Feb 15 10:19:00 GMT 2016, Product=paint5")); - Assert.assertTrue(actualLines[5].contains("RowTime=Mon Feb 15 10:10:00 GMT 2016, Product=abcde6")); - } - - public static boolean waitTillStdoutIsPopulated(ByteArrayOutputStream baos, int timeout) throws InterruptedException, - IOException - { - long now = System.currentTimeMillis(); - Collection<String> filter = Lists.newArrayList(); - while (System.currentTimeMillis() - now < timeout) { - baos.flush(); - String[] sout = baos.toString().split(System.lineSeparator()); - filter = Collections2.filter(Arrays.asList(sout), Predicates.containsPattern("Delta Record:")); - if (filter.size() != 0) { - break; - } - - Thread.sleep(500); - } - - return (filter.size() != 0); - } - -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/resources/input.csv ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/resources/input.csv b/demos/sql/src/test/resources/input.csv deleted file mode 100644 index c4786d1..0000000 --- a/demos/sql/src/test/resources/input.csv +++ /dev/null @@ -1,6 +0,0 @@ -15/02/2016 10:15:00 +0000,1,paint1,11 -15/02/2016 10:16:00 +0000,2,paint2,12 -15/02/2016 10:17:00 +0000,3,paint3,13 -15/02/2016 10:18:00 +0000,4,paint4,14 -15/02/2016 10:19:00 +0000,5,paint5,15 -15/02/2016 10:10:00 +0000,6,abcde6,16 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/sql/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/sql/src/test/resources/log4j.properties b/demos/sql/src/test/resources/log4j.properties deleted file mode 100644 index 8ea3cfe..0000000 --- a/demos/sql/src/test/resources/log4j.properties +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=WARN -test.log.console.threshold=WARN - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=INFO -log4j.logger.org.apache.apex=INFO - -log4j.logger.org.apache.calcite=WARN -log4j.logger.org.apache.kafka=WARN -log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -log4j.logger.org.apache.zookeeper=WARN -log4j.logger.kafka=WARN -log4j.logger.kafka.consumer=WARN http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/pom.xml ---------------------------------------------------------------------- diff --git a/demos/twitter/pom.xml b/demos/twitter/pom.xml deleted file mode 100644 index 767d809..0000000 --- a/demos/twitter/pom.xml +++ /dev/null @@ -1,101 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>twitter-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar Twitter Demo</name> - <description>Twitter Rolling Top Words application demonstrates real-time computations over a sliding window.</description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <dependency> - <!-- required by twitter demo --> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-core</artifactId> - <version>4.0.6</version> - </dependency> - <dependency> - <!-- required by twitter demo --> - <groupId>org.twitter4j</groupId> - <artifactId>twitter4j-stream</artifactId> - <version>4.0.6</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase</artifactId> - <version>0.94.20</version> - <type>jar</type> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>5.1.22</version> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-contrib</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-kinesis</artifactId> - <version>1.9.10</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>2.4.4</version> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>it.unimi.dsi</groupId> - <artifactId>fastutil</artifactId> - <version>6.6.4</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/twitter/src/assemble/appPackage.xml b/demos/twitter/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/twitter/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java deleted file mode 100644 index b9d32ab..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.net.URI; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.Operator.InputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.kinesis.AbstractKinesisInputOperator; -import com.datatorrent.contrib.kinesis.KinesisStringInputOperator; -import com.datatorrent.contrib.kinesis.KinesisStringOutputOperator; -import com.datatorrent.contrib.kinesis.ShardManager; -import com.datatorrent.contrib.twitter.TwitterSampleInput; -import com.datatorrent.lib.algo.UniqueCounter; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; - -/** - * Twitter Demo Application: <br> - * This demo application samples random public status from twitter, send to Hashtag - * extractor and extract the status and send it into kinesis <br> - * Get the records from kinesis and converts into Hashtags. Top 10 Hashtag(s) mentioned in - * tweets in last 5 mins are displayed on every window count (500ms).<br> - * <br> - * - * Real Time Calculation :<br> - * This application calculates top 10 Hashtag mentioned in tweets in last 5 - * minutes across a 1% random tweet sampling on a rolling window basis.<br> - * <br> - * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a>, - * <a href="https://http://aws.amazon.com/">AWS Account</a> and configure the authentication details. - * For launch from CLI, those go into ~/.dt/dt-site.xml: - * <pre> - * {@code - * <?xml version="1.0" encoding="UTF-8"?> - * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> - * <configuration> - * - * <property> <name>dt.operator.TweetSampler.prop.consumerKey</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.prop.accessToken</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.FromKinesis.streamName</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.FromKinesis.accessKey</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.FromKinesis.secretKey</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.ToKinesis.streamName</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.ToKinesis.accessKey</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.ToKinesis.secretKey</name> - * <value>TBD</value> </property> - * - * </configuration> - * } - * </pre> - * Custom Attributes: <br> - * <b>topCounts operator : <b> - * <ul> - * <li>Top Count : 10, number of top unique Hashtag to be reported.</li> - * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li> - * <li>window slide value : 1</li> - * </ul> - * <p> - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see similar output on console as below: - * - * <pre> - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished. - * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request - * </pre> - * - * Scaling Options : <br> - * User can scale application by setting intial partition size > 1 on count - * unique operator. <br> - * <br> - * - * Application DAG : <br> - * <img src="doc-files/Application.gif" width=600px > <br> - * <br> - * - * Streaming Window Size : 500ms(default) <br> - * Operator Details : <br> - * <ul> - * <li><b>The twitterFeed operator : </b> This operator samples random public - * statues from twitter and emits to application. <br> - * Class : com.datatorrent.demos.twitter.TwitterSampleInput <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from - * random sampled statues from twitter. <br> - * Class : {@link com.datatorrent.demos.twitter.TwitterStatusHashtagExtractor} <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b>The outputOp operator : </b> This operator sent the tags into the kinesis. <br> - * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br> - * </li> - * <li><b>The inputOp operator : </b> This operator fetches the records from kinesis and - * converts into hastags and emits them. <br> - * Class : {@link com.datatorrent.contrib.kinesis.KinesisStringOutputOperator} <br> - * </li> - * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each - * Hashtag extracted from random samples. <br> - * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1 - * min sliding window count 1. <br> - * Class : com.datatorrent.lib.algo.WindowedTopCounter <br> - * StateFull : Yes, sliding window count 120 (1 min) <br> - * </li> - * <li><b>The operator Console: </b> This operator just outputs the input tuples - * to the console (or stdout). <br> - * </li> - * </ul> - * - * @since 2.0.0 - */ -@ApplicationAnnotation(name = "TwitterKinesisDemo") -public class KinesisHashtagsApplication implements StreamingApplication -{ - private final Locality locality = null; - - private InputPort<Object> consoleOutput(DAG dag, String operatorName) - { - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if (!StringUtils.isEmpty(gatewayAddress)) { - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); - String topic = "demos.twitter." + operatorName; - //LOG.info("WebSocket with gateway at: {}", gatewayAddress); - PubSubWebSocketOutputOperator<Object> wsOut = dag.addOperator(operatorName, new PubSubWebSocketOutputOperator<Object>()); - wsOut.setUri(uri); - wsOut.setTopic(topic); - return wsOut.input; - } - ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator()); - operator.setStringFormat(operatorName + ": %s"); - return operator.input; - } - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Setup the operator to get the data from twitter sample stream injected into the system. - TwitterSampleInput twitterFeed = new TwitterSampleInput(); - twitterFeed = dag.addOperator("TweetSampler", twitterFeed); - - // Setup the operator to get the Hashtags extracted from the twitter statuses - TwitterStatusHashtagExtractor HashtagExtractor = dag.addOperator("HashtagExtractor", TwitterStatusHashtagExtractor.class); - - //Setup the operator send the twitter statuses to kinesis - KinesisStringOutputOperator outputOp = dag.addOperator("ToKinesis", new KinesisStringOutputOperator()); - outputOp.setBatchSize(500); - - // Feed the statuses from feed into the input of the Hashtag extractor. - dag.addStream("TweetStream", twitterFeed.status, HashtagExtractor.input).setLocality(Locality.CONTAINER_LOCAL); - // Start counting the Hashtags coming out of Hashtag extractor - dag.addStream("SendToKinesis", HashtagExtractor.hashtags, outputOp.inputPort).setLocality(locality); - - //------------------------------------------------------------------------------------------ - - KinesisStringInputOperator inputOp = dag.addOperator("FromKinesis", new KinesisStringInputOperator()); - ShardManager shardStats = new ShardManager(); - inputOp.setShardManager(shardStats); - inputOp.getConsumer().setRecordsLimit(600); - inputOp.setStrategy(AbstractKinesisInputOperator.PartitionStrategy.MANY_TO_ONE.toString()); - - // Setup a node to count the unique Hashtags within a window. - UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>()); - - // Get the aggregated Hashtag counts and count them over last 5 mins. - WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); - topCounts.setTopCount(10); - topCounts.setSlidingWindowWidth(600); - topCounts.setDagWindowWidth(1); - - dag.addStream("TwittedHashtags", inputOp.outputPort, uniqueCounter.data).setLocality(locality); - - // Count unique Hashtags - dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input).setLocality(locality); - // Count top 10 - dag.addStream("TopHashtags", topCounts.output, consoleOutput(dag, "topHashtags")).setLocality(locality); - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java deleted file mode 100644 index 8b9f447..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.io.Serializable; - -/** - * Developed for a demo<br> - * - * @param <T> Type of object for which sliding window is being maintained. - * @since 0.3.2 - */ -public class SlidingContainer<T> implements Serializable -{ - private static final long serialVersionUID = 201305291751L; - T identifier; - int totalCount; - int position; - int[] windowedCount; - - @SuppressWarnings("unused") - private SlidingContainer() - { - /* needed for Kryo serialization */ - } - - public SlidingContainer(T identifier, int windowCount) - { - this.identifier = identifier; - this.totalCount = 0; - this.position = 0; - windowedCount = new int[windowCount]; - } - - public void adjustCount(int i) - { - windowedCount[position] += i; - } - - public void slide() - { - int currentCount = windowedCount[position]; - position = position == windowedCount.length - 1 ? 0 : position + 1; - totalCount += currentCount - windowedCount[position]; - windowedCount[position] = 0; - } - - @Override - public String toString() - { - return identifier + " => " + totalCount; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java deleted file mode 100644 index 9edce64..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.sql.PreparedStatement; -import java.sql.SQLException; - -import javax.annotation.Nonnull; - -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.contrib.twitter.TwitterSampleInput; -import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; - -import twitter4j.Status; - -/** - * An application which connects to Twitter Sample Input and stores all the - * tweets with their usernames in a mysql database. Please review the docs - * for TwitterTopCounterApplication to setup your twitter credentials. You - * may also be able to change JDBCStore credentials using config file. - * - * You will also need to create appropriate database and tables with the - * following schema, also included in mysql.sql in resources: - * <pre> - * DROP TABLE if exists tweets; - * CREATE TABLE tweets ( - * window_id LONG NOT NULL, - * creation_date DATE, - * text VARCHAR(256) NOT NULL, - * userid VARCHAR(40) NOT NULL, - * KEY ( userid, creation_date) - * ); - * - * drop table if exists dt_window_id_tracker; - * CREATE TABLE dt_window_id_tracker ( - * dt_application_id VARCHAR(100) NOT NULL, - * dt_operator_id int(11) NOT NULL, - * dt_window_id bigint NOT NULL, - * UNIQUE (dt_application_id, dt_operator_id, dt_window_id) - * ) ENGINE=MyISAM DEFAULT CHARSET=latin1; - * </pre> - * - * @since 0.9.4 - */ -@ApplicationAnnotation(name = "TwitterDumpDemo") -public class TwitterDumpApplication implements StreamingApplication -{ - public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status> - { - public static final String INSERT_STATUS_STATEMENT = "insert into tweets (window_id, creation_date, text, userid) values (?, ?, ?, ?)"; - - public Status2Database() - { - store.setMetaTable("dt_window_id_tracker"); - store.setMetaTableAppIdColumn("dt_application_id"); - store.setMetaTableOperatorIdColumn("dt_operator_id"); - store.setMetaTableWindowColumn("dt_window_id"); - } - - @Nonnull - @Override - protected String getUpdateCommand() - { - return INSERT_STATUS_STATEMENT; - } - - @Override - protected void setStatementParameters(PreparedStatement statement, Status tuple) throws SQLException - { - statement.setLong(1, currentWindowId); - - statement.setDate(2, new java.sql.Date(tuple.getCreatedAt().getTime())); - statement.setString(3, tuple.getText()); - statement.setString(4, tuple.getUser().getScreenName()); - statement.addBatch(); - } - } - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump"); - - TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput()); - - //ConsoleOutputOperator dbWriter = dag.addOperator("DatabaseWriter", new ConsoleOutputOperator()); - - Status2Database dbWriter = dag.addOperator("DatabaseWriter", new Status2Database()); - dbWriter.getStore().setDatabaseDriver("com.mysql.jdbc.Driver"); - dbWriter.getStore().setDatabaseUrl("jdbc:mysql://node6.morado.com:3306/twitter"); - dbWriter.getStore().setConnectionProperties("user:twitter"); - - dag.addStream("Statuses", twitterStream.status, dbWriter.input).setLocality(Locality.CONTAINER_LOCAL); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java deleted file mode 100644 index 3adbbe0..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.nio.ByteBuffer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Put; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator; -import com.datatorrent.contrib.twitter.TwitterSampleInput; - -import twitter4j.Status; - -/** - * An application which connects to Twitter Sample Input and stores all the - * tweets with their usernames in a hbase database. Please review the docs - * for TwitterTopCounterApplication to setup your twitter credentials. - * - * You need to create the HBase table to run this demo. Table name can be - * configured but columnfamily must be 'cf' to make this demo simple and complied - * with the mysql based demo. - * create 'tablename', 'cf' - * - * </pre> - * - * @since 1.0.2 - */ -@ApplicationAnnotation(name = "TwitterDumpHBaseDemo") -public class TwitterDumpHBaseApplication implements StreamingApplication -{ - - public static class Status2Hbase extends AbstractHBasePutOutputOperator<Status> - { - - @Override - public Put operationPut(Status t) - { - Put put = new Put(ByteBuffer.allocate(8).putLong(t.getCreatedAt().getTime()).array()); - put.add("cf".getBytes(), "text".getBytes(), t.getText().getBytes()); - put.add("cf".getBytes(), "userid".getBytes(), t.getText().getBytes()); - return put; - } - - } - - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump"); - - TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput()); - - Status2Hbase hBaseWriter = dag.addOperator("DatabaseWriter", new Status2Hbase()); - - dag.addStream("Statuses", twitterStream.status, hBaseWriter.input).setLocality(Locality.CONTAINER_LOCAL); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java deleted file mode 100644 index d22db40..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -import twitter4j.HashtagEntity; -import twitter4j.Status; - -/** - * <p>TwitterStatusHashtagExtractor class.</p> - * - * @since 1.0.2 - */ -public class TwitterStatusHashtagExtractor extends BaseOperator -{ - public final transient DefaultOutputPort<String> hashtags = new DefaultOutputPort<String>(); - public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>() - { - @Override - public void process(Status status) - { - HashtagEntity[] entities = status.getHashtagEntities(); - if (entities != null) { - for (HashtagEntity he : entities) { - if (he != null) { - hashtags.emit(he.getText()); - } - } - } - } - - }; -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java deleted file mode 100644 index 6dbc436..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -import twitter4j.Status; -import twitter4j.URLEntity; - -/** - * <p>TwitterStatusURLExtractor class.</p> - * - * @since 0.3.2 - */ -public class TwitterStatusURLExtractor extends BaseOperator -{ - public final transient DefaultOutputPort<String> url = new DefaultOutputPort<String>(); - public final transient DefaultInputPort<Status> input = new DefaultInputPort<Status>() - { - @Override - public void process(Status status) - { - URLEntity[] entities = status.getURLEntities(); - if (entities != null) { - for (URLEntity ue: entities) { - if (ue != null) { // see why we intermittently get NPEs - url.emit((ue.getExpandedURL() == null ? ue.getURL() : ue.getExpandedURL()).toString()); - } - } - } - } - }; - - private static final Logger LOG = LoggerFactory.getLogger(TwitterStatusURLExtractor.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java deleted file mode 100644 index e05a37a..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.util.Arrays; -import java.util.HashSet; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * <p>TwitterStatusWordExtractor class.</p> - * - * @since 0.3.2 - */ -public class TwitterStatusWordExtractor extends BaseOperator -{ - public HashSet<String> filterList; - - public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); - public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() - { - @Override - public void process(String text) - { - String[] strs = text.split(" "); - if (strs != null) { - for (String str : strs) { - if (str != null && !filterList.contains(str) ) { - output.emit(str); - } - } - } - } - }; - - @Override - public void setup(OperatorContext context) - { - this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but", - "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when", - "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"})); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java deleted file mode 100644 index 731a38f..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.net.URI; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Maps; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.Operator; -import com.datatorrent.api.Operator.OutputPort; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.contrib.twitter.TwitterSampleInput; -import com.datatorrent.lib.algo.UniqueCounter; -import com.datatorrent.lib.appdata.schemas.SchemaUtils; -import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; -import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; - -/** - * Twitter Demo Application: <br> - * This demo application samples random public status from twitter, send to url - * extractor. <br> - * Top 10 url(s) mentioned in tweets in last 5 mins are displayed on every - * window count (500ms).<br> - * <br> - * - * Real Time Calculation :<br> - * This application calculates top 10 url mentioned in tweets in last 5 - * minutes across a 1% random tweet sampling on a rolling window basis.<br> - * <br> - * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a> - * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml: - * <pre> - * {@code - * <?xml version="1.0" encoding="UTF-8"?> - * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> - * <configuration> - * - * <property> <name>dt.operator.TweetSampler.consumerKey</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.consumerSecret</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.accessToken</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.accessTokenSecret</name> - * <value>TBD</value> </property> - * </configuration> - * } - * </pre> - * Custom Attributes: <br> - * <b>topCounts operator : <b> - * <ul> - * <li>Top Count : 10, number of top unique url to be reported.</li> - * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li> - * <li>window slide value : 1</li> - * </ul> - * <p> - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see following output on - * console: - * - * <pre> - * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} - * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} - * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} - * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} - * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} - * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} - * topURLs: {http://goo.gl/V0R05=2, http://etsy.me/10r1Yg3=6, http://tinyurl.com/88b5jqb=2, http://www.justunfollow.com=4, http://fllwrs.com=2, http://goo.gl/a9Sjp=2, http://goo.gl/iKeVH=2, http://Unfollowers.me=7, http://freetexthost.com/j3y03la4g3=2, http://uranaitter.com=4} - * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished. - * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request - * </pre> - * - * Scaling Options : <br> - * User can scale application by setting intial partition size > 1 on count - * unique operator. <br> - * <br> - * - * Application DAG : <br> - * <img src="doc-files/Application.gif" width=600px > <br> - * <br> - * - * Streaming Window Size : 500ms(default) <br> - * Operator Details : <br> - * <ul> - * <li><b>The twitterFeed operator : </b> This operator samples random public - * statues from twitter and emits to application. <br> - * Class : com.datatorrent.demos.twitter.TwitterSampleInput <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b>The urlExtractor operator : </b> This operator extracts url from - * random sampled statues from twitter. <br> - * Class : {@link com.datatorrent.demos.twitter.TwitterStatusURLExtractor} <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each - * url extracted from random samples. <br> - * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b> The topCounts operator : </b> This operator caluculates top url in last 1 - * min sliding window count 1. <br> - * Class : com.datatorrent.lib.algo.WindowedTopCounter <br> - * StateFull : Yes, sliding window count 120 (1 min) <br> - * </li> - * <li><b>The operator Console: </b> This operator just outputs the input tuples - * to the console (or stdout). <br> - * </li> - * </ul> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME) -public class TwitterTopCounterApplication implements StreamingApplication -{ - public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json"; - public static final String CONVERSION_SCHEMA = "twitterURLConverterSchema.json"; - public static final String APP_NAME = "TwitterDemo"; - - private final Locality locality = null; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Setup the operator to get the data from twitter sample stream injected into the system. - TwitterSampleInput twitterFeed = new TwitterSampleInput(); - twitterFeed = dag.addOperator("TweetSampler", twitterFeed); - - // Setup the operator to get the URLs extracted from the twitter statuses - TwitterStatusURLExtractor urlExtractor = dag.addOperator("URLExtractor", TwitterStatusURLExtractor.class); - - // Setup a node to count the unique urls within a window. - UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueURLCounter", new UniqueCounter<String>()); - // Get the aggregated url counts and count them over last 5 mins. - dag.setAttribute(uniqueCounter, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 600); - dag.setAttribute(uniqueCounter, Context.OperatorContext.SLIDE_BY_WINDOW_COUNT, 1); - - - WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); - topCounts.setTopCount(10); - topCounts.setSlidingWindowWidth(1); - topCounts.setDagWindowWidth(1); - - // Feed the statuses from feed into the input of the url extractor. - dag.addStream("TweetStream", twitterFeed.status, urlExtractor.input).setLocality(Locality.CONTAINER_LOCAL); - // Start counting the urls coming out of URL extractor - dag.addStream("TwittedURLs", urlExtractor.url, uniqueCounter.data).setLocality(locality); - // Count unique urls - dag.addStream("UniqueURLCounts", uniqueCounter.count, topCounts.input); - - consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url"); - } - - public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias) - { - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - if (!StringUtils.isEmpty(gatewayAddress)) { - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); - - AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap()); - - Map<String, String> conversionMap = Maps.newHashMap(); - conversionMap.put(alias, WindowedTopCounter.FIELD_TYPE); - String snapshotServerJSON = SchemaUtils.jarResourceFileToString(schemaFile); - - snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); - snapshotServer.setTableFieldToMapField(conversionMap); - - PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery(); - wsQuery.setUri(uri); - snapshotServer.setEmbeddableQueryInfoProvider(wsQuery); - - PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); - wsResult.setUri(uri); - Operator.InputPort<String> queryResultPort = wsResult.input; - - dag.addStream("MapProvider", topCount, snapshotServer.input); - dag.addStream("Result", snapshotServer.queryResult, queryResultPort); - } else { - ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator()); - operator.setStringFormat(operatorName + ": %s"); - - dag.addStream("MapProvider", topCount, operator.input); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java deleted file mode 100644 index 3953ab7..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.twitter.TwitterSampleInput; -import com.datatorrent.lib.algo.UniqueCounter; - -/** - * This application is same as other twitter demo - * {@link com.datatorrent.demos.twitter.TwitterTopCounterApplication} <br> - * Run Sample : - * - * <pre> - * 2013-06-17 16:50:34,911 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl info - Connection established. - * 2013-06-17 16:50:34,912 [Twitter Stream consumer-1[Establishing connection]] INFO twitter4j.TwitterStreamImpl info - Receiving status stream. - * topWords: {} - * topWords: {love=1, ate=1, catch=1, calma=1, Phillies=1, ela=1, from=1, running=1} - * </pre> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME) -public class TwitterTopWordsApplication implements StreamingApplication -{ - public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json"; - public static final String CONVERSION_SCHEMA = "twitterWordConverterSchema.json"; - public static final String APP_NAME = "RollingTopWordsDemo"; - public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData"; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - TwitterSampleInput twitterFeed = new TwitterSampleInput(); - twitterFeed = dag.addOperator("TweetSampler", twitterFeed); - - TwitterStatusWordExtractor wordExtractor = dag.addOperator("WordExtractor", TwitterStatusWordExtractor.class); - UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueWordCounter", new UniqueCounter<String>()); - WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); - - topCounts.setSlidingWindowWidth(120); - topCounts.setDagWindowWidth(1); - - dag.addStream("TweetStream", twitterFeed.text, wordExtractor.input); - dag.addStream("TwittedWords", wordExtractor.output, uniqueCounter.data); - dag.addStream("UniqueWordCounts", uniqueCounter.count, topCounts.input).setLocality(Locality.CONTAINER_LOCAL); - - TwitterTopCounterApplication.consoleOutput(dag, "topWords", topCounts.output, SNAPSHOT_SCHEMA, "word"); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java deleted file mode 100644 index 3597a92..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.twitter.TwitterSampleInput; -import com.datatorrent.lib.algo.UniqueCounter; - -/** - * Twitter Demo Application: <br> - * This demo application samples random public status from twitter, send to Hashtag - * extractor. <br> - * Top 10 Hashtag(s) mentioned in tweets in last 5 mins are displayed on every - * window count (500ms).<br> - * <br> - * - * Real Time Calculation :<br> - * This application calculates top 10 Hashtag mentioned in tweets in last 5 - * minutes across a 1% random tweet sampling on a rolling window basis.<br> - * <br> - * Before running this application, you need to have a <a href="https://dev.twitter.com/apps">Twitter API account</a> - * and configure the authentication. For launch from CLI, those go into ~/.dt/dt-site.xml: - * <pre> - * {@code - * <?xml version="1.0" encoding="UTF-8"?> - * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> - * <configuration> - * - * <property> <name>dt.operator.TweetSampler.prop.consumerKey</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.prop.consumerSecret</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.prop.accessToken</name> - * <value>TBD</value> </property> - * - * <property> <name>dt.operator.TweetSampler.prop.accessTokenSecret</name> - * <value>TBD</value> </property> - * </configuration> - * } - * </pre> - * Custom Attributes: <br> - * <b>topCounts operator : <b> - * <ul> - * <li>Top Count : 10, number of top unique Hashtag to be reported.</li> - * <li>Sliding window count : 600, report over last 5 min (600 * .5 / 60 mins)</li> - * <li>window slide value : 1</li> - * </ul> - * <p> - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see similar output on console as below: - * - * <pre> - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * topHashtags: {"string": "{\"gameinsight\":\"12\",\"android\":\"8\",\"WotaJKT48alay\":\"12\",\"TernyataJKT48CyberItuForoumSampah\":\"8\",\"NHL15Subban\":\"30\",\"JKT48CYBERKeracunanCUKRIK\":\"8\",\"verifydjzoodel\":\"59\",\"androidgames\":\"9\",\"٠ع_اÙÙÙ\":\"10\",\"Jct48cyberTAIBABI\":\"10\"}"} - * 2013-06-17 14:38:55,201 [main] INFO stram.StramLocalCluster run - Application finished. - * 2013-06-17 14:38:55,201 [container-2] INFO stram.StramChild processHeartbeatResponse - Received shutdown request - * </pre> - * - * Scaling Options : <br> - * User can scale application by setting intial partition size > 1 on count - * unique operator. <br> - * <br> - * - * Application DAG : <br> - * <img src="doc-files/Application.gif" width=600px > <br> - * <br> - * - * Streaming Window Size : 500ms(default) <br> - * Operator Details : <br> - * <ul> - * <li><b>The twitterFeed operator : </b> This operator samples random public - * statues from twitter and emits to application. <br> - * Class : com.datatorrent.demos.twitter.TwitterSampleInput <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b>The HashtagExtractor operator : </b> This operator extracts Hashtag from - * random sampled statues from twitter. <br> - * Class : {@link com.datatorrent.demos.twitter.TwitterStatusHashtagExtractor} <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b>The uniqueCounter operator : </b> This operator aggregates count for each - * Hashtag extracted from random samples. <br> - * Class : {@link com.datatorrent.lib.algo.UniqueCounter} <br> - * StateFull : No, window count 1 <br> - * </li> - * <li><b> The topCounts operator : </b> This operator caluculates top Hashtag in last 1 - * min sliding window count 1. <br> - * Class : com.datatorrent.lib.algo.WindowedTopCounter <br> - * StateFull : Yes, sliding window count 120 (1 min) <br> - * </li> - * <li><b>The operator Console: </b> This operator just outputs the input tuples - * to the console (or stdout). <br> - * </li> - * </ul> - * - * @since 1.0.2 - */ -@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME) -public class TwitterTrendingHashtagsApplication implements StreamingApplication -{ - public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json"; - public static final String CONVERSION_SCHEMA = "twitterHashTagConverterSchema.json"; - public static final String APP_NAME = "TwitterTrendingDemo"; - public static final String PROP_USE_APPDATA = "dt.application." + APP_NAME + ".useAppData"; - - private final Locality locality = null; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Setup the operator to get the data from twitter sample stream injected into the system. - TwitterSampleInput twitterFeed = new TwitterSampleInput(); - twitterFeed = dag.addOperator("TweetSampler", twitterFeed); - - // Setup a node to count the unique Hashtags within a window. - UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueHashtagCounter", new UniqueCounter<String>()); - - // Get the aggregated Hashtag counts and count them over last 5 mins. - WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>()); - topCounts.setTopCount(10); - topCounts.setSlidingWindowWidth(600); - topCounts.setDagWindowWidth(1); - - dag.addStream("TwittedHashtags", twitterFeed.hashtag, uniqueCounter.data).setLocality(locality); - // Count unique Hashtags - dag.addStream("UniqueHashtagCounts", uniqueCounter.count, topCounts.input); - - TwitterTopCounterApplication.consoleOutput(dag, "topHashtags", topCounts.output, SNAPSHOT_SCHEMA, "hashtag"); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java deleted file mode 100644 index 43ed8f7..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.nio.ByteBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.netlet.util.Slice; - -/** - * <p>URLSerDe class.</p> - * - * @since 0.3.2 - */ -public class URLSerDe implements StreamCodec<byte[]> -{ - /** - * Covert the bytes into object useful for downstream node. - * - * @param fragment - * @return WindowedURLHolder object which represents the bytes. - */ - @Override - public byte[] fromByteArray(Slice fragment) - { - if (fragment == null || fragment.buffer == null) { - return null; - } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) { - return fragment.buffer; - } else { - byte[] buffer = new byte[fragment.buffer.length]; - System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length); - return buffer; - } - } - - /** - * Cast the input object to byte[]. - * - * @param object - byte array representing the bytes of the string - * @return the same object as input - */ - @Override - public Slice toByteArray(byte[] object) - { - return new Slice(object, 0, object.length); - } - - @Override - public int getPartition(byte[] object) - { - ByteBuffer bb = ByteBuffer.wrap(object); - return bb.hashCode(); - } - - private static final Logger logger = LoggerFactory.getLogger(URLSerDe.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java deleted file mode 100644 index 20bb673..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java +++ /dev/null @@ -1,282 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * - * WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size. - * The operator expects to receive a map object which contains a set of objects mapped to their respective frequency of - * occurrences. e.g. if we are looking at most commonly occurring names then the operator expects to receive the tuples - * of type Map<String, Intenger> on its input port, and at the end of the window it emits 1 object of type Map<String, Integer> - * with a pre determined size. The emitted object contains the most frequently occurring keys. - * - * @param <T> Type of the key in the map object which is accepted on input port as payload. Note that this key must be HashMap friendly. - * @since 0.3.2 - */ -public class WindowedTopCounter<T> extends BaseOperator -{ - public static final String FIELD_TYPE = "type"; - public static final String FIELD_COUNT = "count"; - - private static final Logger logger = LoggerFactory.getLogger(WindowedTopCounter.class); - - private PriorityQueue<SlidingContainer<T>> topCounter; - private int windows; - private int topCount = 10; - private int slidingWindowWidth; - private int dagWindowWidth; - private HashMap<T, SlidingContainer<T>> objects = new HashMap<T, SlidingContainer<T>>(); - - /** - * Input port on which map objects containing keys with their respective frequency as values will be accepted. - */ - public final transient DefaultInputPort<Map<T, Integer>> input = new DefaultInputPort<Map<T, Integer>>() - { - @Override - public void process(Map<T, Integer> map) - { - for (Map.Entry<T, Integer> e : map.entrySet()) { - SlidingContainer<T> holder = objects.get(e.getKey()); - if (holder == null) { - holder = new SlidingContainer<T>(e.getKey(), windows); - objects.put(e.getKey(), holder); - } - holder.adjustCount(e.getValue()); - } - } - }; - - public final transient DefaultOutputPort<List<Map<String, Object>>> output = new DefaultOutputPort<List<Map<String, Object>>>(); - - @Override - public void setup(OperatorContext context) - { - windows = (int)(slidingWindowWidth / dagWindowWidth) + 1; - if (slidingWindowWidth % dagWindowWidth != 0) { - logger.warn("slidingWindowWidth(" + slidingWindowWidth + ") is not exact multiple of dagWindowWidth(" + dagWindowWidth + ")"); - } - - topCounter = new PriorityQueue<SlidingContainer<T>>(this.topCount, new TopSpotComparator()); - } - - @Override - public void beginWindow(long windowId) - { - topCounter.clear(); - } - - @Override - public void endWindow() - { - Iterator<Map.Entry<T, SlidingContainer<T>>> iterator = objects.entrySet().iterator(); - int i = topCount; - - /* - * Try to fill the priority queue with the first topCount URLs. - */ - SlidingContainer<T> holder; - while (iterator.hasNext()) { - holder = iterator.next().getValue(); - holder.slide(); - - if (holder.totalCount == 0) { - iterator.remove(); - } else { - topCounter.add(holder); - if (--i == 0) { - break; - } - } - } - logger.debug("objects.size(): {}", objects.size()); - - /* - * Make room for the new element in the priority queue by deleting the - * smallest one, if we KNOW that the new element is useful to us. - */ - if (i == 0) { - int smallest = topCounter.peek().totalCount; - while (iterator.hasNext()) { - holder = iterator.next().getValue(); - holder.slide(); - - if (holder.totalCount > smallest) { - topCounter.poll(); - topCounter.add(holder); - smallest = topCounter.peek().totalCount; - } else if (holder.totalCount == 0) { - iterator.remove(); - } - } - } - - List<Map<String, Object>> data = Lists.newArrayList(); - - Iterator<SlidingContainer<T>> topIter = topCounter.iterator(); - - while (topIter.hasNext()) { - final SlidingContainer<T> wh = topIter.next(); - Map<String, Object> tableRow = Maps.newHashMap(); - - tableRow.put(FIELD_TYPE, wh.identifier.toString()); - tableRow.put(FIELD_COUNT, wh.totalCount); - - data.add(tableRow); - } - - Collections.sort(data, TwitterOutputSorter.INSTANCE); - - output.emit(data); - topCounter.clear(); - } - - @Override - public void teardown() - { - topCounter = null; - objects = null; - } - - /** - * Set the count of most frequently occurring keys to emit per map object. - * - * @param count count of the objects in the map emitted at the output port. - */ - public void setTopCount(int count) - { - topCount = count; - } - - public int getTopCount() - { - return topCount; - } - - /** - * @return the windows - */ - public int getWindows() - { - return windows; - } - - /** - * @param windows the windows to set - */ - public void setWindows(int windows) - { - this.windows = windows; - } - - /** - * @return the slidingWindowWidth - */ - public int getSlidingWindowWidth() - { - return slidingWindowWidth; - } - - /** - * Set the width of the sliding window. - * - * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently - * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms, - * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000. - * - * @param slidingWindowWidth - Sliding window width to be set for this operator, recommended to be multiple of DAG window. - */ - public void setSlidingWindowWidth(int slidingWindowWidth) - { - this.slidingWindowWidth = slidingWindowWidth; - } - - /** - * @return the dagWindowWidth - */ - public int getDagWindowWidth() - { - return dagWindowWidth; - } - - /** - * Set the width of the sliding window. - * - * Sliding window is typically much larger than the dag window. e.g. One may want to measure the most frequently - * occurring keys over the period of 5 minutes. So if dagWindowWidth (which is by default 500ms) is set to 500ms, - * the slidingWindowWidth would be (60 * 5 * 1000 =) 300000. - * - * @param dagWindowWidth - DAG's native window width. It has to be the value of the native window set at the application level. - */ - public void setDagWindowWidth(int dagWindowWidth) - { - this.dagWindowWidth = dagWindowWidth; - } - - static class TopSpotComparator implements Comparator<SlidingContainer<?>> - { - @Override - public int compare(SlidingContainer<?> o1, SlidingContainer<?> o2) - { - if (o1.totalCount > o2.totalCount) { - return 1; - } else if (o1.totalCount < o2.totalCount) { - return -1; - } - - return 0; - } - } - - private static class TwitterOutputSorter implements Comparator<Map<String, Object>> - { - public static final TwitterOutputSorter INSTANCE = new TwitterOutputSorter(); - - private TwitterOutputSorter() - { - } - - @Override - public int compare(Map<String, Object> o1, Map<String, Object> o2) - { - Integer count1 = (Integer)o1.get(FIELD_COUNT); - Integer count2 = (Integer)o2.get(FIELD_COUNT); - - return count1.compareTo(count2); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif deleted file mode 100644 index d21e1d9..0000000 Binary files a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/doc-files/Application.gif and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java deleted file mode 100644 index 5a02e4b..0000000 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Twitter top URL's demonstration application. - */ -package com.datatorrent.demos.twitter; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml b/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml deleted file mode 100644 index 7d45153..0000000 --- a/demos/twitter/src/main/resources/META-INF/properties-TwitterKinesisDemo.xml +++ /dev/null @@ -1,52 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> - -<!-- properties for the twitter kinesis demo --> -<configuration> - - <property> - <name>dt.operator.FromKinesis.streamName</name> - <value>TwitterTag</value> - </property> - <property> - <name>dt.operator.FromKinesis.accessKey</name> - </property> - <property> - <name>dt.operator.FromKinesis.secretKey</name> - </property> - <property> - <name>dt.operator.FromKinesis.endPoint</name> - </property> - <property> - <name>dt.operator.ToKinesis.streamName</name> - <value>TwitterTag</value> - </property> - <property> - <name>dt.operator.ToKinesis.accessKey</name> - </property> - <property> - <name>dt.operator.ToKinesis.secretKey</name> - </property> - <property> - <name>dt.operator.ToKinesis.endPoint</name> - </property> - -</configuration>
