all five flink examples passing
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/9dcdf645 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/9dcdf645 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/9dcdf645 Branch: refs/heads/master Commit: 9dcdf645080302d2f8e1bc7dc3d312817d459cf5 Parents: 58fefc0 Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Wed Oct 5 16:42:25 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Wed Oct 5 16:42:25 2016 -0500 ---------------------------------------------------------------------- flink/flink-twitter-collection/pom.xml | 28 ++++--- .../FlinkTwitterFollowingPipeline.scala | 2 +- .../collection/FlinkTwitterPostsPipeline.scala | 2 +- .../FlinkTwitterSpritzerPipeline.scala | 28 +++++-- .../FlinkTwitterUserInformationPipeline.scala | 2 +- .../markdown/FlinkTwitterSpritzerPipeline.md | 6 +- .../src/site/markdown/index.md | 6 +- .../resources/FlinkTwitterFollowingPipeline.dot | 37 +++++++++ .../resources/FlinkTwitterPostsPipeline.dot | 37 +++++++++ .../resources/FlinkTwitterSpritzerPipeline.dot | 33 ++++++++ .../FlinkTwitterUserInformationPipeline.dot | 37 +++++++++ ...linkTwitterFollowingPipelineFollowersIT.conf | 6 +- .../FlinkTwitterFollowingPipelineFriendsIT.conf | 5 +- .../FlinkTwitterSpritzerPipelineIT.conf | 15 ++++ .../FlinkTwitterUserInformationPipelineIT.conf | 2 +- ...inkTwitterFollowingPipelineFollowersIT.scala | 55 +++++++++++++ ...FlinkTwitterFollowingPipelineFriendsIT.scala | 59 ++++++++++++++ .../test/FlinkTwitterFollowingPipelineIT.scala | 86 -------------------- .../test/FlinkTwitterSpritzerPipelineIT.scala | 9 +- 19 files changed, 336 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/pom.xml ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml index 2d35035..4cf0b89 100644 --- a/flink/flink-twitter-collection/pom.xml +++ b/flink/flink-twitter-collection/pom.xml @@ -448,16 +448,24 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> - <version>2.12.4</version> - <executions> - <execution> - <id>integration-tests</id> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> + <configuration> + <!-- Run integration test suite rather than individual tests. --> + <excludes> + <exclude>**/*Test.java</exclude> + <exclude>**/*Tests.java</exclude> + </excludes> + <includes> + <include>**/*IT.java</include> + <include>**/*ITs.java</include> + </includes> + </configuration> + <dependencies> + <dependency> + <groupId>org.apache.maven.surefire</groupId> + <artifactId>surefire-testng</artifactId> + <version>${failsafe.plugin.version}</version> + </dependency> + </dependencies> </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala index 2fd9336..a20078e 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -126,7 +126,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio // if( test == true ) jsons.print(); - env.execute("FlinkTwitterFollowingPipeline") + env.execute(STREAMS_ID) } class FollowingCollectorFlatMapFunction( http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala index beea973..bb7d54c 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -134,7 +134,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new // if( test == true ) jsons.print(); - env.execute("FlinkTwitterPostsPipeline") + env.execute(STREAMS_ID) } class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable { http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala index b615806..d6ed3df 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala @@ -1,10 +1,12 @@ package org.apache.streams.examples.flink.twitter.collection +import java.io.Serializable import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.base.{Preconditions, Strings} import com.google.common.util.concurrent.Uninterruptibles +import org.apache.flink.api.common.functions.StoppableFunction import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api.TimeCharacteristic @@ -21,6 +23,7 @@ import org.apache.streams.twitter.TwitterStreamConfiguration import org.apache.streams.twitter.provider.TwitterStreamProvider import org.slf4j.{Logger, LoggerFactory} import org.apache.flink.api.scala._ +import org.apache.streams.twitter.converter.TwitterDateTimeFormat import scala.collection.JavaConversions._ @@ -82,6 +85,8 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration import FlinkTwitterSpritzerPipeline._ + val spritzerSource = new SpritzerSource(config.getTwitter) + override def run(): Unit = { val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) @@ -91,7 +96,7 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration val outPath = buildWriterPath(config.getDestination) - val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter)); + val streamSource : DataStream[String] = env.addSource(spritzerSource); if( config.getTest == false ) streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs") @@ -101,15 +106,23 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration // if( test == true ) jsons.print(); - env.execute("FlinkTwitterPostsPipeline") + env.execute(STREAMS_ID) + + } + + def stop(): Unit = { + spritzerSource.stop() } - class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable { + class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction { + + var mapper: ObjectMapper = _ var twitProvider: TwitterStreamProvider = _ @throws[Exception] override def open(parameters: Configuration): Unit = { + mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT) twitProvider = new TwitterStreamProvider( sourceConfig ) twitProvider.prepare(twitProvider) twitProvider.startStream() @@ -120,17 +133,16 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration do { Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS) iterator = twitProvider.readCurrent().iterator() - iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String])) + iterator.toList.map(datum => ctx.collect(mapper.writeValueAsString(datum.getDocument))) } while( twitProvider.isRunning ) } override def cancel(): Unit = { - twitProvider.cleanUp() + close() } - @throws[Exception] - override def close(): Unit = { - twitProvider.cleanUp() + override def stop(): Unit = { + close(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala index 867255d..ad0315a 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -128,7 +128,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline LOGGER.info("StreamExecutionEnvironment: {}", env.toString ) - env.execute("FlinkTwitterUserInformationPipeline") + env.execute(STREAMS_ID) } class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] { http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md index 259fe7f..1e59039 100644 --- a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md +++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterSpritzerPipeline.md @@ -24,17 +24,17 @@ Example Configuration: Run (Local): ------------ - java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline + java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline Run (Flink): ------------ - flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> + flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file> Run (YARN): ----------- - flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> + flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterSpritzerPipeline http://<location_of_config_file> [JavaDocs](apidocs/index.html "JavaDocs") http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md index 0f15603..24783be 100644 --- a/flink/flink-twitter-collection/src/site/markdown/index.md +++ b/flink/flink-twitter-collection/src/site/markdown/index.md @@ -16,11 +16,13 @@ Collects large batches of documents from api.twitter.com from a seed set of ids. Streams: -------- -<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a> +<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a> <a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a> -<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a> +<a href="FlinkTwitterSpritzerPipeline.html" target="_self">FlinkTwitterSpritzerPipeline</a> + +<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a> Test: ----- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot new file mode 100644 index 0000000..ba5e60d --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterFollowingPipeline.dot @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + digraph g { + + //source + source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab]; + + //providers + TwitterFollowingProvider [label="TwitterFollowingProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java"]; + + //persisters + RollingFileSink [label="RollingFileSink",shape=ellipse]; + + //data + destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab]; + + //stream + TwitterFollowingProvider -> source [dir=back,style=dashed]; + TwitterFollowingProvider -> RollingFileSink [label="String"]; + RollingFileSink -> destination; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot new file mode 100644 index 0000000..1092ff4 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterPostsPipeline.dot @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + digraph g { + + //source + source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab]; + + //providers + TwitterTimelineProvider [label="TwitterTimelineProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java"]; + + //persisters + RollingFileSink [label="RollingFileSink",shape=ellipse]; + + //data + destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab]; + + //stream + TwitterTimelineProvider -> source [dir=back,style=dashed]; + TwitterTimelineProvider -> RollingFileSink [label="String"]; + RollingFileSink -> destination; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot new file mode 100644 index 0000000..5a57595 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterSpritzerPipeline.dot @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + digraph g { + + //providers + TwitterStreamProvider [label="TwitterStreamProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java"]; + + //persisters + RollingFileSink [label="RollingFileSink",shape=ellipse]; + + //data + destination [label="hdfs://${host}:${port}/${path}/${writerPath}",shape=box]; + + //stream + TwitterStreamProvider -> RollingFileSink [label="String"]; + RollingFileSink -> destination; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot new file mode 100644 index 0000000..4a37234 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/FlinkTwitterUserInformationPipeline.dot @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + digraph g { + + //source + source [label="source\nhdfs://${host}:${port}/${path}/${readerPath}",shape=tab]; + + //providers + TwitterUserInformationProvider [label="TwitterUserInformationProvider",shape=ellipse,URL="https://github.com/apache/incubator-streams/blob/master/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java"]; + + //persisters + RollingFileSink [label="RollingFileSink",shape=ellipse]; + + //data + destination [label="destination\nhdfs://${host}:${port}/${path}/${writerPath}",shape=tab]; + + //stream + TwitterUserInformationProvider -> source [dir=back,style=dashed]; + TwitterUserInformationProvider -> RollingFileSink [label="String"]; + RollingFileSink -> destination; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf index 87057be..3e922ab 100644 --- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf @@ -10,7 +10,11 @@ destination { path = "target/test-classes" writerPath = "FlinkTwitterFollowingPipelineFollowersIT" } -twitter.endpoint = friends +twitter { + endpoint = followers + ids_only = true + max_items = 5000 +} providerWaitMs = 1000 local = true test = true http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf index b5212ed..038a8dc 100644 --- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf @@ -10,7 +10,10 @@ destination { path = "target/test-classes" writerPath = "FlinkTwitterFollowingPipelineFriendsIT" } -twitter.endpoint = friends +twitter { + endpoint = friends + ids_only = true +} providerWaitMs = 1000 local = true test = true http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf new file mode 100644 index 0000000..fec4769 --- /dev/null +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterSpritzerPipelineIT.conf @@ -0,0 +1,15 @@ +destination { + fields = ["DOC"] + scheme = file + path = "target/test-classes" + writerPath = "FlinkTwitterSpritzerPipelineIT" +} +twitter { + endpoint = sample + track = [ + "data" + ] +} +providerWaitMs = 1000 +local = true +test = true http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf index 342a850..d3663fe 100644 --- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf @@ -2,7 +2,7 @@ source { fields = ["ID"] scheme = file path = "target/test-classes" - readerPath = "asf.txt" + readerPath = "1000twitterids.txt" } destination { fields = ["DOC"] http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala new file mode 100644 index 0000000..f38ad92 --- /dev/null +++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala @@ -0,0 +1,55 @@ +package org.apache.streams.examples.flink.twitter.test + +import java.io.File +import java.nio.file.{Files, Paths} + +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} +import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration +import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline +import org.scalatest.FlatSpec +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ +import org.slf4j.{Logger, LoggerFactory} +import org.testng.annotations.Test + +import scala.io.Source + +/** + * Created by sblackmon on 3/13/16. + */ +class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec { + + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT]) + + import FlinkTwitterFollowingPipeline._ + + @Test + def flinkTwitterFollowersPipelineFollowersIT = { + + val reference: Config = ConfigFactory.load() + val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf") + assert(conf_file.exists()) + val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + val typesafe: Config = testResourceConfig.withFallback(reference).resolve() + val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) + val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) + + setup(testConfig) + + val job = new FlinkTwitterFollowingPipeline(config = testConfig) + val jobThread = new Thread(job) + jobThread.start + jobThread.join + + eventually (timeout(60 seconds), interval(1 seconds)) { + assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) + assert( + Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size + > 4000) + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala new file mode 100644 index 0000000..464e743 --- /dev/null +++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala @@ -0,0 +1,59 @@ +package com.peoplepattern.streams.twitter.collection + +import java.io.File +import java.nio.file.{Files, Paths} + +import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} +import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._ +import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline} +import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration} +import org.junit.Ignore +import org.slf4j.{Logger, LoggerFactory} +import org.testng.annotations.Test + +import scala.io.Source +import org.scalatest.FlatSpec +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.{Seconds, Span} +import org.scalatest.time.SpanSugar._ + +/** + * Created by sblackmon on 3/13/16. + */ +class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec { + + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT]) + + import FlinkTwitterFollowingPipeline._ + + @Test + def flinkTwitterFollowersPipelineFriendsIT = { + + val reference: Config = ConfigFactory.load() + val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf") + assert(conf_file.exists()) + val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + val typesafe: Config = testResourceConfig.withFallback(reference).resolve() + val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) + val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) + + setup(testConfig) + + val job = new FlinkTwitterFollowingPipeline(config = testConfig) + val jobThread = new Thread(job) + jobThread.start + jobThread.join + + eventually (timeout(60 seconds), interval(1 seconds)) { + assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) + assert( + Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size + > 90) + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala deleted file mode 100644 index e6294f6..0000000 --- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala +++ /dev/null @@ -1,86 +0,0 @@ -package com.peoplepattern.streams.twitter.collection - -import java.io.File -import java.nio.file.{Files, Paths} - -import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} -import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} -import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._ -import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline} -import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration} -import org.slf4j.{Logger, LoggerFactory} -import org.testng.annotations.Test - -import scala.io.Source -import org.scalatest.FlatSpec -import org.scalatest.concurrent.Eventually._ -import org.scalatest.time.{Seconds, Span} -import org.scalatest.time.SpanSugar._ - -/** - * Created by sblackmon on 3/13/16. - */ -class FlinkTwitterFollowingPipelineIT extends FlatSpec { - - private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT]) - - import FlinkTwitterFollowingPipeline._ - - @Test(enabled = false) - def flinkTwitterFollowersPipelineFriendsIT = { - - val reference: Config = ConfigFactory.load() - val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf") - assert(conf_file.exists()) - val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - val typesafe: Config = testResourceConfig.withFallback(reference).resolve() - val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) - val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) - - setup(testConfig) - - val job = new FlinkTwitterFollowingPipeline(config = testConfig) - val jobThread = new Thread(job) - jobThread.start - jobThread.join - - eventually (timeout(60 seconds), interval(1 seconds)) { - assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) - assert( - Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size - > 90) - } - - } - - @Test(enabled = false) - def flinkTwitterFollowersPipelineFollowersIT = { - - val reference: Config = ConfigFactory.load() - val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf") - assert(conf_file.exists()) - val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - val typesafe: Config = testResourceConfig.withFallback(reference).resolve() - val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) - val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) - - setup(testConfig) - - val job = new FlinkTwitterFollowingPipeline(config = testConfig) - val jobThread = new Thread(job) - jobThread.start - jobThread.join - - eventually (timeout(60 seconds), interval(1 seconds)) { - assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) - assert( - Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size - > 500) - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/9dcdf645/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala index f083f65..2e2e9b1 100644 --- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala +++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala @@ -26,7 +26,7 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec { import FlinkTwitterSpritzerPipeline._ - @Test(enabled = false) + @Test def flinkTwitterSpritzerPipelineIT = { val reference: Config = ConfigFactory.load() @@ -43,13 +43,14 @@ class FlinkTwitterSpritzerPipelineIT extends FlatSpec { val job = new FlinkTwitterSpritzerPipeline(config = testConfig) val jobThread = new Thread(job) jobThread.start - jobThread.join + jobThread.join(30000) + job.stop() - eventually (timeout(30 seconds), interval(1 seconds)) { + eventually (timeout(60 seconds), interval(1 seconds)) { assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) assert( Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size - >= 200) + >= 10) } }
