http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/META-INF/properties.xml b/demos/twitter/src/main/resources/META-INF/properties.xml deleted file mode 100644 index e3042fa..0000000 --- a/demos/twitter/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,121 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> - -<!-- properties for rolling top words demo --> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1256</value> - </property> - <!-- default operator size 256MB --> - <property> - <name>dt.application.*.operator.*.attr.MEMORY_MB</name> - <value>256</value> - </property> - <property> - <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> - <value>-Xmx128M</value> - </property> - - <!-- default buffer memory 256MB --> - <property> - <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> - <value>256</value> - </property> - <property> - <name>dt.operator.TweetSampler.consumerKey</name> - </property> - <property> - <name>dt.operator.TweetSampler.consumerSecret</name> - </property> - <property> - <name>dt.operator.TweetSampler.accessToken</name> - </property> - <property> - <name>dt.operator.TweetSampler.accessTokenSecret</name> - </property> - <property> - <name>dt.operator.TweetSampler.feedMultiplierVariance</name> - <value>5</value> - </property> - <property> - <name>dt.operator.TweetSampler.feedMultiplier</name> - <value>20</value> - </property> - - <!-- RollingTopWordsDemo --> - - <property> - <name>dt.application.RollingTopWordsDemo.operator.TopCounter.topCount</name> - <value>10</value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.stream.TweetStream.locality</name> - <value>CONTAINER_LOCAL</value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> - <value>TwitterWordsQuery</value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.operator.QueryResult.topic</name> - <value>TwitterWordsQueryResult</value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.operator.QueryResult.numRetries</name> - <value>2147483647</value> - </property> - - <!-- TwitterDemo --> - - <property> - <name>dt.application.TwitterDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> - <value>TwitterURLQuery</value> - </property> - <property> - <name>dt.application.TwitterDemo.operator.QueryResult.topic</name> - <value>TwitterURLQueryResult</value> - </property> - <property> - <name>dt.application.TwitterDemo.operator.QueryResult.numRetries</name> - <value>2147483647</value> - </property> - <property> - <name>dt.application.TwitterDemo.operator.UniqueURLCounter.attr.APPLICATION_WINDOW_COUNT</name> - <value>60</value> - </property> - - <!-- TwitterTrendingDemo --> - - <property> - <name>dt.application.TwitterTrendingDemo.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> - <value>TwitterHashtagQueryDemo</value> - </property> - <property> - <name>dt.application.TwitterTrendingDemo.operator.QueryResult.topic</name> - <value>TwitterHashtagQueryResultDemo</value> - </property> - <property> - <name>dt.application.TwitterTrendingDemo.operator.QueryResult.numRetries</name> - <value>2147483647</value> - </property> - -</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/mysql.sql ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/mysql.sql b/demos/twitter/src/main/resources/mysql.sql deleted file mode 100644 index e0b97dd..0000000 --- a/demos/twitter/src/main/resources/mysql.sql +++ /dev/null @@ -1,35 +0,0 @@ --- --- Licensed to the Apache Software Foundation (ASF) under one --- or more contributor license agreements. See the NOTICE file --- distributed with this work for additional information --- regarding copyright ownership. The ASF licenses this file --- to you under the Apache License, Version 2.0 (the --- "License"); you may not use this file except in compliance --- with the License. You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, --- software distributed under the License is distributed on an --- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY --- KIND, either express or implied. See the License for the --- specific language governing permissions and limitations --- under the License. --- - -DROP TABLE if exists tweets; -CREATE TABLE tweets ( - window_id LONG NOT NULL, - creation_date DATE, - text VARCHAR(256) NOT NULL, - userid VARCHAR(40) NOT NULL, - KEY ( userid, creation_date) - ); - -drop table if exists dt_window_id_tracker; -CREATE TABLE dt_window_id_tracker ( - dt_application_id VARCHAR(100) NOT NULL, - dt_operator_id int(11) NOT NULL, - dt_window_id bigint NOT NULL, - UNIQUE (dt_application_id, dt_operator_id, dt_window_id) -) ENGINE=MyISAM DEFAULT CHARSET=latin1; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/top_urls.tplg.properties ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/top_urls.tplg.properties b/demos/twitter/src/main/resources/top_urls.tplg.properties deleted file mode 100644 index c106d7d..0000000 --- a/demos/twitter/src/main/resources/top_urls.tplg.properties +++ /dev/null @@ -1,48 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -stram.node.twitterfeed.classname=com.datatorrent.example.twitter.TwitterSampleInput - -stram.stream.status.source=twitterfeed.output -stram.stream.status.sinks=urlextractor.input - -stram.node.urlextractor.classname=com.datatorrent.example.twitter.TwitterStatusURLExtractor - -stram.stream.collapsedurls.source=urlextractor.output -stram.stream.collapsedurls.sinks= - -stram.node. -stram.stream.partitionedtf.input=twitterfeed.output -stram.stream.partitionedtf.output=partitioned_counter.input -stram.stream.partitionedtf.serdeClassname=com.datatorrent.example.twitter.URLSerDe - -stram.node.partitioned_counter.classname=com.datatorrent.example.twitter.PartitionedCounter -stram.node.partitioned_counter.topCount=10 - -stram.stream.merge_stream.input=partitioned_counter.output -stram.stream.merge_stream.output=merge_counter.input -stram.stream.merge_stream.serdeClassname=com.datatorrent.example.twitter.URLHolderSerde - -stram.node.merge_counter.classname=com.datatorrent.example.twitter.MergeSorter -stram.node.merge_counter.topCount=10 - -stram.stream.merged_stream.input=merge_counter.output -stram.stream.merged_stream.output=console.input - -stram.node.console.classname=com.datatorrent.stream.ConsoleOutputStream http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/twitterHashTagDataSchema.json ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/twitterHashTagDataSchema.json b/demos/twitter/src/main/resources/twitterHashTagDataSchema.json deleted file mode 100644 index 0c9296c..0000000 --- a/demos/twitter/src/main/resources/twitterHashTagDataSchema.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "values": [{"name": "hashtag", "type": "string"}, - {"name": "count", "type": "integer"}] -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/twitterURLDataSchema.json ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/twitterURLDataSchema.json b/demos/twitter/src/main/resources/twitterURLDataSchema.json deleted file mode 100644 index ecf723e..0000000 --- a/demos/twitter/src/main/resources/twitterURLDataSchema.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "values": [{"name": "url", "type": "string"}, - {"name": "count", "type": "integer"}] -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/main/resources/twitterWordDataSchema.json ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/resources/twitterWordDataSchema.json b/demos/twitter/src/main/resources/twitterWordDataSchema.json deleted file mode 100644 index 5e8e7c0..0000000 --- a/demos/twitter/src/main/resources/twitterWordDataSchema.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "values": [{"name": "word", "type": "string"}, - {"name": "count", "type": "integer"}] -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java deleted file mode 100644 index cd211ff..0000000 --- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import static org.junit.Assert.assertEquals; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; - -/** - * Test for the application which taps into the twitter's sample input stream and - * dumps all the tweets into a database. - */ -public class TwitterDumpApplicationTest -{ - @Test - public void testPopulateDAG() throws Exception - { - Configuration configuration = new Configuration(false); - - LocalMode lm = LocalMode.newInstance(); - DAG prepareDAG = lm.prepareDAG(new TwitterDumpApplication(), configuration); - DAG clonedDAG = lm.cloneDAG(); - - assertEquals("Serialization", prepareDAG, clonedDAG); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java deleted file mode 100644 index 91a4e20..0000000 --- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; -import com.datatorrent.contrib.twitter.TwitterSampleInput; - -/** - * Test the DAG declaration in local mode. - */ -public class TwitterTopCounterTest -{ - /** - * This test requires twitter authentication setup and is skipped by default - * (see {@link TwitterSampleInput}). - * - * @throws Exception - */ - @Test - public void testApplication() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - new TwitterTopCounterApplication().populateDAG(lma.getDAG(), new Configuration(false)); - LocalMode.Controller lc = lma.getController(); - lc.run(120000); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java deleted file mode 100644 index 4ac2e8d..0000000 --- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.twitter; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; -import com.datatorrent.contrib.twitter.TwitterSampleInput; - -/** - * Test the DAG declaration in local mode. - */ -public class TwitterTopWordsTest -{ - /** - * This test requires twitter authentication setup and is skipped by default - * (see {@link TwitterSampleInput}). - * - * @throws Exception - */ - @Test - public void testApplication() throws Exception - { - TwitterTopWordsApplication app = new TwitterTopWordsApplication(); - Configuration conf = new Configuration(false); - conf.addResource("dt-site-rollingtopwords.xml"); - LocalMode lma = LocalMode.newInstance(); - lma.prepareDAG(app, conf); - LocalMode.Controller lc = lma.getController(); - lc.run(120000); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml b/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml deleted file mode 100644 index b1d4153..0000000 --- a/demos/twitter/src/test/resources/dt-site-rollingtopwords.xml +++ /dev/null @@ -1,73 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> -<property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.class</name> - <value>com.datatorrent.demos.twitter.TwitterTopWordsApplication</value> - <description>An alias for the application</description> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.operator.TopCounter.topCount - </name> - <value>10</value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.stream.TweetStream.locality - </name> - <value>CONTAINER_LOCAL</value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.stream.TwittedWords.locality - </name> - <value></value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.stream.UniqueWordCounts.locality - </name> - <value></value> - </property> - <property> - <name>dt.application.RollingTopWordsDemo.stream.TopWords.locality</name> - <value></value> - </property> - <property> - <name>dt.operator.TweetSampler.consumerKey</name> - <value>r1DqM35iCTjbgLHf1R7rDbF5R</value> - </property> - <property> - <name>dt.operator.TweetSampler.consumerSecret</name> - <value>KBpZiR0glPzvZPm1Sa7sq9MCGQ2H2DrVChNtmYQcU75fwuHjed</value> - </property> - <property> - <name>dt.operator.TweetSampler.accessToken</name> - <value>2490355837-lXKev9vIGzftDjDu9LlyQiuGAqjYyPELFRyRpQo</value> - </property> - <property> - <name>dt.operator.TweetSampler.accessTokenSecret</name> - <value>lmAxZlFhcBqeTxmyatFT43fzshzrv6lsOAtHDsCBLjiuk</value> - </property> - -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/twitter/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/resources/log4j.properties b/demos/twitter/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/twitter/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/pom.xml ---------------------------------------------------------------------- diff --git a/demos/uniquecount/pom.xml b/demos/uniquecount/pom.xml deleted file mode 100644 index 7b402fc..0000000 --- a/demos/uniquecount/pom.xml +++ /dev/null @@ -1,50 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>uniquecount</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar Unique Count Demo</name> - <description></description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>3.1</version> - <scope>provided</scope> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/assemble/appPackage.xml b/demos/uniquecount/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/uniquecount/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java deleted file mode 100644 index 57ef1a1..0000000 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.lib.algo.UniqueCounter; -import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.stream.Counter; -import com.datatorrent.lib.stream.StreamDuplicater; -import com.datatorrent.lib.util.KeyHashValPair; - -/** - * Application to demonstrate PartitionableUniqueCount operator. <br> - * The input operator generate random keys, which is sent to - * PartitionableUniqueCount operator initially partitioned into three partitions to - * test unifier functionality, and output of the operator is sent to verifier to verify - * that it generates correct result. - * - * @since 1.0.2 - */ -@ApplicationAnnotation(name = "UniqueValueCountDemo") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration entries) - { - /* Generate random key-value pairs */ - RandomKeysGenerator randGen = dag.addOperator("randomgen", new RandomKeysGenerator()); - - - /* Initialize with three partition to start with */ - // UniqueCount1 uniqCount = dag.addOperator("uniqevalue", new UniqueCount1()); - UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>()); - - MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter()); - - uniqCount.setCumulative(false); - dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3)); - - CountVerifier<Integer> verifier = dag.addOperator("verifier", new CountVerifier<Integer>()); - StreamDuplicater<KeyHashValPair<Integer, Integer>> dup = dag.addOperator("dup", new StreamDuplicater<KeyHashValPair<Integer, Integer>>()); - ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator()); - - ConsoleOutputOperator successOutput = dag.addOperator("successoutput", new ConsoleOutputOperator()); - successOutput.setStringFormat("Success %d"); - ConsoleOutputOperator failureOutput = dag.addOperator("failureoutput", new ConsoleOutputOperator()); - failureOutput.setStringFormat("Failure %d"); - - // success and failure counters. - Counter successcounter = dag.addOperator("successcounter", new Counter()); - Counter failurecounter = dag.addOperator("failurecounter", new Counter()); - - dag.addStream("datain", randGen.outPort, uniqCount.data); - dag.addStream("dataverification0", randGen.verificationPort, verifier.in1); - dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL); - dag.addStream("split", converter.output, dup.data); - dag.addStream("consoutput", dup.out1, output.input); - dag.addStream("dataverification1", dup.out2, verifier.in2); - dag.addStream("successc", verifier.successPort, successcounter.input); - dag.addStream("failurec", verifier.failurePort, failurecounter.input); - dag.addStream("succconsoutput", successcounter.output, successOutput.input); - dag.addStream("failconsoutput", failurecounter.output, failureOutput.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java deleted file mode 100644 index d201037..0000000 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import java.util.HashMap; -import java.util.Map; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.lib.util.KeyHashValPair; - -/* -Compare results and print non-matching values to console. - */ -/** - * <p>CountVerifier class.</p> - * - * @since 1.0.2 - */ -public class CountVerifier<K> implements Operator -{ - HashMap<K, Integer> map1 = new HashMap<K, Integer>(); - HashMap<K, Integer> map2 = new HashMap<K, Integer>(); - - public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in1 = new DefaultInputPort<KeyHashValPair<K, Integer>>() - { - @Override - public void process(KeyHashValPair<K, Integer> tuple) - { - processTuple(tuple, map1); - } - }; - - public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in2 = new DefaultInputPort<KeyHashValPair<K, Integer>>() - { - @Override - public void process(KeyHashValPair<K, Integer> tuple) - { - processTuple(tuple, map2); - } - }; - - void processTuple(KeyHashValPair<K, Integer> tuple, HashMap<K, Integer> map) - { - map.put(tuple.getKey(), tuple.getValue()); - } - - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>(); - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>(); - - @Override - public void beginWindow(long l) - { - - } - - @Override - public void endWindow() - { - int failureCount = 0; - for (Map.Entry<K, Integer> e : map1.entrySet()) { - K key = e.getKey(); - int val = map2.get(key); - if (val != e.getValue()) { - failureCount++; - } - } - if (failureCount != 0) { - failurePort.emit(failureCount); - } else { - successPort.emit(map1.size()); - } - } - - @Override - public void setup(Context.OperatorContext operatorContext) - { - - } - - @Override - public void teardown() - { - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java deleted file mode 100644 index e806759..0000000 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import java.util.HashMap; -import java.util.Random; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Generate random Key value pairs. - * key is string and value is int, it emits the pair as KeyValPair on outPort, - * - * @since 1.0.2 - */ -public class RandomDataGenerator implements InputOperator -{ - public final transient DefaultOutputPort<KeyValPair<String, Object>> outPort = new DefaultOutputPort<KeyValPair<String, Object>>(); - private HashMap<String, Integer> dataInfo; - private final transient Logger LOG = LoggerFactory.getLogger(RandomDataGenerator.class); - private int count; - private int sleepMs = 10; - private int keyRange = 100; - private int valueRange = 100; - private long tupleBlast = 10000; - private Random random; - - public RandomDataGenerator() - { - random = new Random(); - } - - @Override - public void emitTuples() - { - for (int i = 0; i < tupleBlast; i++) { - String key = String.valueOf(random.nextInt(keyRange)); - int val = random.nextInt(valueRange); - outPort.emit(new KeyValPair<String, Object>(key, val)); - } - try { - Thread.sleep(sleepMs); - } catch (Exception ex) { - LOG.error(ex.getMessage()); - } - count++; - } - - public int getSleepMs() - { - return sleepMs; - } - - public void setSleepMs(int sleepMs) - { - this.sleepMs = sleepMs; - } - - public long getTupleBlast() - { - return tupleBlast; - } - - public void setTupleBlast(long tupleBlast) - { - this.tupleBlast = tupleBlast; - } - - @Override - public void beginWindow(long l) - { - - } - - @Override - public void endWindow() - { - LOG.debug("emitTuples called " + count + " times in this window"); - count = 0; - } - - @Override - public void setup(Context.OperatorContext operatorContext) - { - - } - - @Override - public void teardown() - { - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java deleted file mode 100644 index 28f3bc0..0000000 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import java.util.BitSet; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Input port operator for generating random values on keys. <br> - * Key(s) : key + integer in range between 0 and numKeys <br> - * Value(s) : integer in range of 0 to numValuesPerKeys <br> - * - * @since 0.9.3 - */ -public class RandomKeyValues implements InputOperator -{ - public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>(); - private Random random = new Random(11111); - private int numKeys; - private int numValuesPerKeys; - private int tuppleBlast = 1000; - private int emitDelay = 20; /* 20 ms */ - - /* For verification */ - private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>(); - - public RandomKeyValues() - { - this.numKeys = 100; - this.numValuesPerKeys = 100; - } - - public RandomKeyValues(int keys, int values) - { - this.numKeys = keys; - this.numValuesPerKeys = values; - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - @Override - public void setup(OperatorContext context) - { - } - - @Override - public void teardown() - { - } - - @Override - public void emitTuples() - { - /* generate tuples randomly, */ - for (int i = 0; i < tuppleBlast; i++) { - int intKey = random.nextInt(numKeys); - String key = "key" + String.valueOf(intKey); - int value = random.nextInt(numValuesPerKeys); - - // update history for verifying later. - BitSet bmap = history.get(intKey); - if (bmap == null) { - bmap = new BitSet(); - history.put(intKey, bmap); - } - bmap.set(value); - - // emit the key with value. - outport.emit(new KeyValPair<String, Object>(key, value)); - } - try { - Thread.sleep(emitDelay); - } catch (Exception e) { - // Ignore. - } - } - - public int getNumKeys() - { - return numKeys; - } - - public void setNumKeys(int numKeys) - { - this.numKeys = numKeys; - } - - public int getNumValuesPerKeys() - { - return numValuesPerKeys; - } - - public void setNumValuesPerKeys(int numValuesPerKeys) - { - this.numValuesPerKeys = numValuesPerKeys; - } - - public int getTuppleBlast() - { - return tuppleBlast; - } - - public void setTuppleBlast(int tuppleBlast) - { - this.tuppleBlast = tuppleBlast; - } - - public int getEmitDelay() - { - return emitDelay; - } - - public void setEmitDelay(int emitDelay) - { - this.emitDelay = emitDelay; - } - - public void debug() - { - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java deleted file mode 100644 index eb9d22c..0000000 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import org.apache.commons.lang3.mutable.MutableInt; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.datatorrent.lib.util.KeyHashValPair; - -/* - Generate random keys. - */ -/** - * <p>RandomKeysGenerator class.</p> - * - * @since 1.0.2 - */ -public class RandomKeysGenerator implements InputOperator -{ - - protected int numKeys = 100; - protected int tupleBlast = 15000; - protected long sleepTime = 0; - protected Map<Integer, MutableInt> history = new HashMap<Integer, MutableInt>(); - private Random random = new Random(); - private Date date = new Date(); - private long start; - - @OutputPortFieldAnnotation(optional = false) - public transient DefaultOutputPort<Integer> outPort = new DefaultOutputPort<Integer>(); - - @OutputPortFieldAnnotation(optional = true) - public transient DefaultOutputPort<KeyHashValPair<Integer, Integer>> verificationPort = - new DefaultOutputPort<KeyHashValPair<Integer, Integer>>(); - - @Override - public void emitTuples() - { - for (int i = 0; i < tupleBlast; i++) { - int key = random.nextInt(numKeys); - outPort.emit(key); - - - if (verificationPort.isConnected()) { - // maintain history for later verification. - MutableInt count = history.get(key); - if (count == null) { - count = new MutableInt(0); - history.put(key, count); - } - count.increment(); - } - - } - try { - if (sleepTime != 0) { - Thread.sleep(sleepTime); - } - } catch (Exception ex) { - // Ignore. - } - } - - public RandomKeysGenerator() - { - start = date.getTime(); - } - - @Override - public void beginWindow(long l) - { - - } - - @Override - public void endWindow() - { - - if (verificationPort.isConnected()) { - for (Map.Entry<Integer, MutableInt> e : history.entrySet()) { - verificationPort.emit(new KeyHashValPair<Integer, Integer>(e.getKey(), e.getValue().toInteger())); - } - history.clear(); - } - - } - - @Override - public void setup(Context.OperatorContext operatorContext) - { - - } - - @Override - public void teardown() - { - - } - - public int getNumKeys() - { - return numKeys; - } - - public void setNumKeys(int numKeys) - { - this.numKeys = numKeys; - } - - public int getTupleBlast() - { - return tupleBlast; - } - - public void setTupleBlast(int tupleBlast) - { - this.tupleBlast = tupleBlast; - } - - public long getSleepTime() - { - return sleepTime; - } - - public void setSleepTime(long sleepTime) - { - this.sleepTime = sleepTime; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java deleted file mode 100644 index eb9e392..0000000 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.common.partitioner.StatelessPartitioner; - -import com.datatorrent.lib.algo.UniqueCounter; -import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.util.KeyValPair; - -/** - * <p>UniqueKeyValCountDemo class.</p> - * - * @since 1.0.2 - */ -@ApplicationAnnotation(name = "UniqueKeyValueCountDemo") -public class UniqueKeyValCountDemo implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration entries) - { - /* Generate random key-value pairs */ - RandomDataGenerator randGen = dag.addOperator("randomgen", new RandomDataGenerator()); - - /* Initialize with three partition to start with */ - UniqueCounter<KeyValPair<String, Object>> uniqCount = - dag.addOperator("uniqevalue", new UniqueCounter<KeyValPair<String, Object>>()); - MapToKeyHashValuePairConverter<KeyValPair<String, Object>, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter()); - uniqCount.setCumulative(false); - dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<KeyValPair<String, Object>>>(3)); - - ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator()); - - dag.addStream("datain", randGen.outPort, uniqCount.data); - dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL); - dag.addStream("consoutput", converter.output, output.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java deleted file mode 100644 index 6f81c0d..0000000 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - Demo Application for new Paritionable UniqueCount Operator. - */ -package com.datatorrent.demos.uniquecount; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/resources/META-INF/properties.xml b/demos/uniquecount/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 8742328..0000000 --- a/demos/uniquecount/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,29 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- - <property> - <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> - <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> - </property> - --> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/site/conf/my-app-conf1.xml b/demos/uniquecount/src/site/conf/my-app-conf1.xml deleted file mode 100644 index f35873b..0000000 --- a/demos/uniquecount/src/site/conf/my-app-conf1.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java deleted file mode 100644 index 66a0af1..0000000 --- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest -{ - @Test - public void testApplication() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - new Application().populateDAG(lma.getDAG(), new Configuration(false)); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java deleted file mode 100644 index a198247..0000000 --- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.uniquecount; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; - -/** - * Test the DAG declaration in local mode. - */ -public class UniqueKeyValDemoTest -{ - @Test - public void testApplication() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - new UniqueKeyValCountDemo().populateDAG(lma.getDAG(), new Configuration(false)); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/uniquecount/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/test/resources/log4j.properties b/demos/uniquecount/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/uniquecount/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/pom.xml ---------------------------------------------------------------------- diff --git a/demos/wordcount/pom.xml b/demos/wordcount/pom.xml deleted file mode 100644 index 410daea..0000000 --- a/demos/wordcount/pom.xml +++ /dev/null @@ -1,49 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>wordcount-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar Wordcount Demo</name> - <description>A very simple application that demonstrates Apex Platformâs streaming window feature.</description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <properties> - <skipTests>true</skipTests> - </properties> - - <dependencies> - <dependency> - <groupId>it.unimi.dsi</groupId> - <artifactId>fastutil</artifactId> - <version>6.6.4</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/assemble/appPackage.xml b/demos/wordcount/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/wordcount/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java deleted file mode 100644 index d0512cf..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.algo.UniqueCounter; -import com.datatorrent.lib.io.ConsoleOutputOperator; - -/** - * Simple Word Count Demo : <br> - * This is application to count total occurrence of each word from file or any - * stream. <br> - * <br> - * - * Functional Description : <br> - * This demo declares custom input operator to read data file set by user. <br> - * This input operator can be replaced by any stream input operator. <br> - * <br> - * - * Custom Attribute(s) : None <br> - * <br> - * - * Input Adapter : <br> - * Word input operator opens user specified data file and streams each line to - * application. <br> - * <br> - * - * Output Adapter : <br> - * Output values are written to console through ConsoleOutputOerator<br> - * If needed you can use other output adapters<br> - * <br> - * <p> - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see following output on console: <br> - * </pre> - * {developed=1} {bushâs=2} {roster=1} {council=1} {mankiw=1} {academia=1} - * {of=6} {help=1} {are=1} {presidential=1} - * </pre> <br> <br> - * - * Scaling Options : <br> - * This operator app can not be scaled, please look at implementation {@link com.datatorrent.lib.algo.UniqueCounterEach} <br> <br> - * - * Application DAG : <br> - * <img src="doc-files/UniqueWordCounter.jpg" width=600px > <br> - * - * Streaming Window Size : 500ms - * Operator Details : <br> - * <ul> - * <li> - * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application. - * This can replaced by any input stream by user. <br> - * Class : {@link com.datatorrent.demos.wordcount.WordCountInputOperator} <br> - * Operator Application Window Count : 1 <br> - * StateFull : No - * </li> - * <li> - * <p><b> The operator count : </b> This operator aggregates unique key count over one window count(app). <br> - * Class : {@link com.datatorrent.lib.algo.UniqueCounterEach} <br> - * Operator Application Window Count : 1 <br> - * StateFull : No - * </li> - * <li> - * <p><b>The operator Console: </b> This operator just outputs the input tuples to the console (or stdout). - * This case it emits unique count of each word over 500ms. - * </li> - * </ul> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "WordCountDemo") -public class Application implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - WordCountInputOperator input = dag.addOperator("wordinput", new WordCountInputOperator()); - UniqueCounter<String> wordCount = dag.addOperator("count", new UniqueCounter<String>()); - dag.addStream("wordinput-count", input.outputPort, wordCount.data); - ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator()); - dag.addStream("count-console",wordCount.count, consoleOperator.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java deleted file mode 100644 index 7e5bb93..0000000 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/ApplicationWithQuerySupport.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.wordcount; - -import java.net.URI; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.Operator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -import com.datatorrent.lib.appdata.schemas.SchemaUtils; -import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; -import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; - -/** - * Simple demo that computes word frequencies from any file dropped into a - * monitored directory. It outputs the top N word-frequency pairs for each file - * as well globally across all files. - * <p> - * Each input file generates a corresponding output file in the output directory - * containing the top N pairs for that file. The output is also written - * to an internal store to support visualization in the UI via queries. - * <p> - * @since 3.2.0 - */ -@ApplicationAnnotation(name = "TopNWordsWithQueries") -public class ApplicationWithQuerySupport implements StreamingApplication -{ - private static final Logger LOG = LoggerFactory.getLogger(ApplicationWithQuerySupport.class); - - /** - * Name of schema file. - */ - public static final String SNAPSHOT_SCHEMA = "WordDataSchema.json"; - - /** - * Populates the DAG with operators and connecting streams - * - * @param dag The directed acyclic graph of operators to populate - * @param conf The configuration - */ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // create operators - LineReader lineReader = dag.addOperator("lineReader", new LineReader()); - WordReader wordReader = dag.addOperator("wordReader", new WordReader()); - WindowWordCount windowWordCount = dag.addOperator("windowWordCount", new WindowWordCount()); - FileWordCount fileWordCount = dag.addOperator("fileWordCount", new FileWordCount()); - WordCountWriter wcWriter = dag.addOperator("wcWriter", new WordCountWriter()); - ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); - console.setStringFormat("wordCount: %s"); - - // create streams - - dag.addStream("lines", lineReader.output, wordReader.input); - dag.addStream("control", lineReader.control, fileWordCount.control); - dag.addStream("words", wordReader.output, windowWordCount.input); - dag.addStream("windowWordCounts", windowWordCount.output, fileWordCount.input); - dag.addStream("fileWordCounts", fileWordCount.fileOutput, wcWriter.input); - - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - - if (!StringUtils.isEmpty(gatewayAddress)) { // add query support - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); - - AppDataSnapshotServerMap snapshotServerFile - = dag.addOperator("snapshotServerFile", new AppDataSnapshotServerMap()); - AppDataSnapshotServerMap snapshotServerGlobal - = dag.addOperator("snapshotServerGlobal", new AppDataSnapshotServerMap()); - - String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); - snapshotServerFile.setSnapshotSchemaJSON(snapshotServerJSON); - snapshotServerGlobal.setSnapshotSchemaJSON(snapshotServerJSON); - - PubSubWebSocketAppDataQuery wsQueryFile = new PubSubWebSocketAppDataQuery(); - PubSubWebSocketAppDataQuery wsQueryGlobal = new PubSubWebSocketAppDataQuery(); - wsQueryFile.setUri(uri); - wsQueryGlobal.setUri(uri); - - snapshotServerFile.setEmbeddableQueryInfoProvider(wsQueryFile); - snapshotServerGlobal.setEmbeddableQueryInfoProvider(wsQueryGlobal); - - PubSubWebSocketAppDataResult wsResultFile - = dag.addOperator("wsResultFile", new PubSubWebSocketAppDataResult()); - PubSubWebSocketAppDataResult wsResultGlobal - = dag.addOperator("wsResultGlobal", new PubSubWebSocketAppDataResult()); - wsResultFile.setUri(uri); - wsResultGlobal.setUri(uri); - - Operator.InputPort<String> queryResultFilePort = wsResultFile.input; - Operator.InputPort<String> queryResultGlobalPort = wsResultGlobal.input; - - dag.addStream("WordCountsFile", fileWordCount.outputPerFile, snapshotServerFile.input, console.input); - dag.addStream("WordCountsGlobal", fileWordCount.outputGlobal, snapshotServerGlobal.input); - - dag.addStream("ResultFile", snapshotServerFile.queryResult, queryResultFilePort); - dag.addStream("ResultGlobal", snapshotServerGlobal.queryResult, queryResultGlobalPort); - } else { - //throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS"); - dag.addStream("WordCounts", fileWordCount.outputPerFile, console.input); - } - - LOG.info("done with populateDAG, isDebugEnabled = " + LOG.isDebugEnabled()); - LOG.info("Returning from populateDAG"); - } - -}
