flink examples building and running
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/0112a838 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/0112a838 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/0112a838 Branch: refs/heads/master Commit: 0112a83874bb7f896a4e3964d5fde75e5967afe6 Parents: 4491cfe Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Thu Sep 29 19:15:39 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Thu Sep 29 19:15:39 2016 -0500 ---------------------------------------------------------------------- flink/flink-twitter-collection/pom.xml | 127 +++++++++++------ .../TwitterSpritzerPipelineConfiguration.json | 29 ++++ .../FlinkTwitterFollowingPipeline.scala | 11 +- .../collection/FlinkTwitterPostsPipeline.scala | 22 ++- .../FlinkTwitterSpritzerPipeline.scala | 138 +++++++++++++++++++ .../FlinkTwitterUserInformationPipeline.scala | 28 ++-- .../TwitterSpritzerPipelineConfiguration.json | 29 ++++ .../FlinkTwitterFollowingPipeline.conf | 10 -- ...linkTwitterFollowingPipelineFollowersIT.conf | 16 +++ .../FlinkTwitterFollowingPipelineFriendsIT.conf | 16 +++ .../resources/FlinkTwitterPostsPipeline.conf | 10 -- .../resources/FlinkTwitterPostsPipelineIT.conf | 15 ++ .../FlinkTwitterUserInformationPipeline.conf | 10 -- .../FlinkTwitterUserInformationPipelineIT.conf | 15 ++ .../test/FlinkTwitterFollowingPipelineIT.scala | 71 +++++----- .../test/FlinkTwitterPostsPipelineIT.scala | 38 ++--- .../test/FlinkTwitterSpritzerPipelineIT.scala | 57 ++++++++ .../FlinkTwitterUserInformationPipelineIT.scala | 33 +++-- flink/pom.xml | 3 - local/elasticsearch-hdfs/pom.xml | 14 +- local/elasticsearch-reindex/pom.xml | 2 +- local/mongo-elasticsearch-sync/pom.xml | 12 +- local/twitter-follow-graph/pom.xml | 10 +- local/twitter-history-elasticsearch/pom.xml | 14 +- local/twitter-userstream-elasticsearch/pom.xml | 14 +- 25 files changed, 543 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/pom.xml ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml index 33b05fe..2d35035 100644 --- a/flink/flink-twitter-collection/pom.xml +++ b/flink/flink-twitter-collection/pom.xml @@ -34,17 +34,38 @@ <description>Collects twitter documents using flink.</description> <properties> - <docker.repo>apachestreams</docker.repo> + <testng.version>6.9.10</testng.version> <hdfs.version>2.7.0</hdfs.version> <flink.version>1.1.2</flink.version> + <scala.version>2.10.6</scala.version> + <scalatest.version>2.2.5</scalatest.version> <scala.suffix>2.10</scala.suffix> + <scala-maven.plugin.version>3.2.2</scala-maven.plugin.version> </properties> <dependencies> <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.suffix}</artifactId> + <version>${scalatest.version}</version> <scope>test</scope> </dependency> <dependency> @@ -65,13 +86,11 @@ <groupId>org.apache.streams</groupId> <artifactId>streams-util</artifactId> <version>${project.version}</version> - <type>test-jar</type> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-pojo</artifactId> <version>${project.version}</version> - <type>test-jar</type> </dependency> <dependency> <groupId>org.apache.streams</groupId> @@ -277,6 +296,19 @@ <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <version>${testng.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + </dependencies> <build> @@ -293,17 +325,6 @@ </testResource> </testResources> <plugins> - <plugin> - <artifactId>maven-clean-plugin</artifactId> - <configuration> - <filesets> - <fileset> - <directory>data</directory> - <followSymlinks>false</followSymlinks> - </fileset> - </filesets> - </configuration> - </plugin> <!-- This binary runs with logback --> <!-- Keep log4j out --> <plugin> @@ -334,8 +355,56 @@ </executions> </plugin> <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>${scala-maven.plugin.version}</version> + <executions> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>${project.build.finalName}</finalName> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + <exclude>**/logback.xml</exclude> + <exclude>**/log4j.properties</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> </plugin> <plugin> <groupId>org.jsonschema2pojo</groupId> @@ -348,7 +417,6 @@ <sourcePath>src/main/jsonschema</sourcePath> </sourcePaths> <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> - <targetPackage>org.apache.streams.example.elasticsearch</targetPackage> <useJodaDates>false</useJodaDates> </configuration> <executions> @@ -379,25 +447,6 @@ </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <configuration> - <includes>**/*.json</includes> - <outputDirectory>${project.build.directory}/test-classes</outputDirectory> - <includeGroupIds>org.apache.streams</includeGroupIds> - <includeTypes>test-jar</includeTypes> - </configuration> - <executions> - <execution> - <id>test-resource-dependencies</id> - <phase>process-test-resources</phase> - <goals> - <goal>unpack-dependencies</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> <version>2.12.4</version> <executions> @@ -410,10 +459,6 @@ </execution> </executions> </plugin> - <plugin> - <groupId>io.fabric8</groupId> - <artifactId>docker-maven-plugin</artifactId> - </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json new file mode 100644 index 0000000..49d0d1e --- /dev/null +++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterSpritzerPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala index 2ac7d32..2fd9336 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -3,6 +3,7 @@ package org.apache.streams.examples.flink.twitter.collection import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.base.{Preconditions, Strings} import com.google.common.util.concurrent.Uninterruptibles import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.core.fs.FileSystem @@ -17,10 +18,10 @@ import org.apache.streams.twitter.TwitterFollowingConfiguration import org.apache.streams.twitter.pojo.Follow import org.apache.streams.twitter.provider.TwitterFollowingProvider import org.slf4j.{Logger, LoggerFactory} -import org.apache.flink.api.scala._ import org.apache.streams.examples.flink.FlinkBase import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration} +import org.apache.flink.api.scala._ import scala.collection.JavaConversions._ @@ -75,6 +76,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase { return false } + Preconditions.checkNotNull(jobConfig.getTwitter.getOauth) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) + return true } @@ -134,7 +141,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio def collectConnections(id : String, out : Collector[StreamsDatum]) = { val twitProvider: TwitterFollowingProvider = new TwitterFollowingProvider( - twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration] + twitterConfiguration.withIdsOnly(true).withInfo(List(toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration] ) twitProvider.prepare(twitProvider) twitProvider.startStream() http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala index f8e221c..beea973 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -3,11 +3,8 @@ package org.apache.streams.examples.flink.twitter.collection import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.base.{Preconditions, Strings} import com.google.common.util.concurrent.Uninterruptibles -import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration -import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil} -import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration -import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.scala.{ExecutionEnvironment, _} @@ -33,6 +30,7 @@ import org.apache.streams.twitter.TwitterUserInformationConfiguration import org.apache.streams.twitter.pojo.{Tweet, User} import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider} import org.slf4j.{Logger, LoggerFactory} +import org.apache.flink.api.scala._ import scala.collection.JavaConversions._ @@ -84,6 +82,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase { return false } + Preconditions.checkNotNull(jobConfig.getTwitter.getOauth) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) + return true } @@ -105,16 +109,8 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new val outPath = buildWriterPath(config.getDestination) - //val inProps = buildKafkaProps(config.getSourceTopic) - val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids") - //val idTopicIn = new KafkaSink() - -// val idTopicOut : DataStream[String] = env.addSource[String]( -// new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(), -// inProps)); - val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs ) // these datums contain 'Tweet' objects @@ -149,7 +145,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new val twitterConfiguration = config.getTwitter val twitProvider: TwitterTimelineProvider = new TwitterTimelineProvider( - twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l) + twitterConfiguration.withInfo(List(toProviderId(id))).withMaxItems(200l) ) twitProvider.prepare(twitProvider) twitProvider.startStream() http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala new file mode 100644 index 0000000..b615806 --- /dev/null +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala @@ -0,0 +1,138 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.util.concurrent.TimeUnit + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.base.{Preconditions, Strings} +import com.google.common.util.concurrent.Uninterruptibles +import org.apache.flink.configuration.Configuration +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} +import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.connectors.fs.RollingSink +import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.core.StreamsDatum +import org.apache.streams.examples.flink.FlinkBase +import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration +import org.apache.streams.flink.FlinkStreamingConfiguration +import org.apache.streams.jackson.StreamsJacksonMapper +import org.apache.streams.twitter.TwitterStreamConfiguration +import org.apache.streams.twitter.provider.TwitterStreamProvider +import org.slf4j.{Logger, LoggerFactory} +import org.apache.flink.api.scala._ + +import scala.collection.JavaConversions._ + +/** + * Created by sblackmon on 7/29/15. + */ +object FlinkTwitterSpritzerPipeline extends FlinkBase { + + val STREAMS_ID: String = "FlinkTwitterSpritzerPipeline" + + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline]) + private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance() + + override def main(args: Array[String]) = { + super.main(args) + val jobConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe) + if( setup(jobConfig) == false ) System.exit(1) + val pipeline: FlinkTwitterSpritzerPipeline = new FlinkTwitterSpritzerPipeline(jobConfig) + val thread = new Thread(pipeline) + thread.start() + thread.join() + } + + def setup(jobConfig: TwitterSpritzerPipelineConfiguration): Boolean = { + + LOGGER.info("TwitterSpritzerPipelineConfiguration: " + jobConfig) + + if( jobConfig == null ) { + LOGGER.error("jobConfig is null!") + System.err.println("jobConfig is null!") + return false + } + + if( jobConfig.getDestination == null ) { + LOGGER.error("jobConfig.getDestination is null!") + System.err.println("jobConfig.getDestination is null!") + return false + } + + if( jobConfig.getTwitter == null ) { + LOGGER.error("jobConfig.getTwitter is null!") + System.err.println("jobConfig.getTwitter is null!") + return false + } + + Preconditions.checkNotNull(jobConfig.getTwitter.getOauth) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) + + return true + + } + +} + +class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable { + + import FlinkTwitterSpritzerPipeline._ + + override def run(): Unit = { + + val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) + + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setNumberOfExecutionRetries(0) + + val outPath = buildWriterPath(config.getDestination) + + val streamSource : DataStream[String] = env.addSource(new SpritzerSource(config.getTwitter)); + + if( config.getTest == false ) + streamSource.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs") + else + streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + .setParallelism(env.getParallelism); + + // if( test == true ) jsons.print(); + + env.execute("FlinkTwitterPostsPipeline") + } + + class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable { + + var twitProvider: TwitterStreamProvider = _ + + @throws[Exception] + override def open(parameters: Configuration): Unit = { + twitProvider = new TwitterStreamProvider( sourceConfig ) + twitProvider.prepare(twitProvider) + twitProvider.startStream() + } + + override def run(ctx: SourceFunction.SourceContext[String]): Unit = { + var iterator: Iterator[StreamsDatum] = null + do { + Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS) + iterator = twitProvider.readCurrent().iterator() + iterator.toList.map(datum => ctx.collect(datum.getDocument.asInstanceOf[String])) + } while( twitProvider.isRunning ) + } + + override def cancel(): Unit = { + twitProvider.cleanUp() + } + + @throws[Exception] + override def close(): Unit = { + twitProvider.cleanUp() + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala index a081c74..867255d 100644 --- a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -1,28 +1,18 @@ package org.apache.streams.examples.flink.twitter.collection -import java.lang import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.base.{Preconditions, Strings} import org.apache.flink.core.fs.FileSystem import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows} - -import scala.collection.JavaConversions._ -import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER import com.google.common.util.concurrent.Uninterruptibles -import org.apache.streams.examples.flink.FlinkBase import org.apache.flink.api.common.functions.RichFlatMapFunction -import org.apache.flink.api.common.restartstrategy.RestartStrategies -import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream} -import org.apache.flink.streaming.api.windowing.time.Time -import org.apache.flink.streaming.api.windowing.triggers._ import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.{ExecutionEnvironment, _} -import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.connectors.fs.RollingSink import org.apache.flink.util.Collector import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} @@ -30,13 +20,13 @@ import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration import org.apache.streams.flink.FlinkStreamingConfiguration -import org.apache.streams.hdfs.HdfsConfiguration import org.apache.streams.jackson.StreamsJacksonMapper -import org.apache.streams.twitter.TwitterUserInformationConfiguration -import org.apache.streams.twitter.pojo.{Tweet, User} -import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider} +import org.apache.streams.twitter.pojo.User +import org.apache.streams.twitter.provider.TwitterUserInformationProvider import org.slf4j.{Logger, LoggerFactory} +import scala.collection.JavaConversions._ + /** * Created by sblackmon on 3/15/16. */ @@ -85,6 +75,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase { return false } + Preconditions.checkNotNull(jobConfig.getTwitter.getOauth) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessToken)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getAccessTokenSecret)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerKey)) + Preconditions.checkArgument(!Strings.isNullOrEmpty(jobConfig.getTwitter.getOauth.getConsumerSecret)) + return true } @@ -137,7 +133,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] { override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 ) - out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList) + out.collect(input.map(id => toProviderId(id)).toList) } } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json new file mode 100644 index 0000000..49d0d1e --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/TwitterSpritzerPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterStreamConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf deleted file mode 100644 index e74f00c..0000000 --- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipeline.conf +++ /dev/null @@ -1,10 +0,0 @@ -twitter { - endpoint = followers - version = 1.1 - oauth { - consumerKey = "" - consumerSecret = "" - accessToken = "" - accessTokenSecret = "" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf new file mode 100644 index 0000000..87057be --- /dev/null +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf @@ -0,0 +1,16 @@ +source { + fields = ["ID"] + scheme = file + path = "target/test-classes" + readerPath = "asf.txt" +} +destination { + fields = ["DOC"] + scheme = file + path = "target/test-classes" + writerPath = "FlinkTwitterFollowingPipelineFollowersIT" +} +twitter.endpoint = friends +providerWaitMs = 1000 +local = true +test = true http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf new file mode 100644 index 0000000..b5212ed --- /dev/null +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf @@ -0,0 +1,16 @@ +source { + fields = ["ID"] + scheme = file + path = "target/test-classes" + readerPath = "asf.txt" +} +destination { + fields = ["DOC"] + scheme = file + path = "target/test-classes" + writerPath = "FlinkTwitterFollowingPipelineFriendsIT" +} +twitter.endpoint = friends +providerWaitMs = 1000 +local = true +test = true http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf deleted file mode 100644 index 63a6481..0000000 --- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipeline.conf +++ /dev/null @@ -1,10 +0,0 @@ -twitter { - version = 1.1 - endpoint = statuses - oauth { - consumerKey = "" - consumerSecret = "" - accessToken = "" - accessTokenSecret = "" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf new file mode 100644 index 0000000..6954113 --- /dev/null +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf @@ -0,0 +1,15 @@ +source { + fields = ["ID"] + scheme = file + path = "target/test-classes" + readerPath = "asf.txt" +} +destination { + fields = ["DOC"] + scheme = file + path = "target/test-classes" + writerPath = "FlinkTwitterPostsPipelineIT" +} +providerWaitMs = 1000 +local = true +test = true http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf deleted file mode 100644 index 6e0a879..0000000 --- a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipeline.conf +++ /dev/null @@ -1,10 +0,0 @@ -twitter { - version = 1.1 - endpoint = users - oauth { - consumerKey = "" - consumerSecret = "" - accessToken = "" - accessTokenSecret = "" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf new file mode 100644 index 0000000..342a850 --- /dev/null +++ b/flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf @@ -0,0 +1,15 @@ +source { + fields = ["ID"] + scheme = file + path = "target/test-classes" + readerPath = "asf.txt" +} +destination { + fields = ["DOC"] + scheme = file + path = "target/test-classes" + writerPath = "FlinkTwitterUserInformationPipelineIT" +} +providerWaitMs = 1000 +local = true +test = true http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala index aa2b1a9..b051e90 100644 --- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala +++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineIT.scala @@ -1,17 +1,22 @@ package com.peoplepattern.streams.twitter.collection +import java.io.File import java.nio.file.{Files, Paths} -import com.peoplepattern.streams.pipelines.pdb.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration} -import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} +import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._ +import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterFollowingPipeline, FlinkTwitterSpritzerPipeline} import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration} -import org.scalatest.FlatSpec -import org.scalatest.concurrent.Eventually._ -import org.scalatest.time.SpanSugar._ import org.slf4j.{Logger, LoggerFactory} import org.testng.annotations.Test import scala.io.Source +import org.scalatest.FlatSpec +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.{Seconds, Span} +import org.scalatest.time.SpanSugar._ /** * Created by sblackmon on 3/13/16. @@ -20,30 +25,31 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineIT]) + import FlinkTwitterFollowingPipeline._ + @Test def flinkTwitterFollowersPipelineFriendsIT = { - val testConfig : TwitterFollowingPipelineConfiguration = - new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig) - testConfig.getTwitter.setEndpoint("friends") - val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration] - source.setPath("target/test-classes") - testConfig.setSource(source); - val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration] - destination.setPath("target/test-classes") - testConfig.setDestination(destination) - testConfig.setProviderWaitMs(1000l) - testConfig.setTest(true) + val reference: Config = ConfigFactory.load() + val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFollowersIT.conf") + assert(conf_file.exists()) + val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + val typesafe: Config = testResourceConfig.withFallback(reference).resolve() + val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) + val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) + + setup(testConfig) val job = new FlinkTwitterFollowingPipeline(config = testConfig) val jobThread = new Thread(job) jobThread.start jobThread.join - eventually (timeout(30 seconds), interval(1 seconds)) { - assert(Files.exists(Paths.get("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends"))) + eventually (timeout(60 seconds), interval(1 seconds)) { + assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) assert( - Source.fromFile("target/test-classes/pdb-twitter-collect/FlinkTwitterFollowingPipeline/friends", "UTF-8").getLines.size + Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size > 90) } @@ -52,27 +58,26 @@ class FlinkTwitterFollowingPipelineIT extends FlatSpec { @Test def flinkTwitterFollowersPipelineFollowersIT = { - val testConfig : TwitterFollowingPipelineConfiguration = - new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig) - testConfig.getTwitter.setEndpoint("followers") - val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration] - source.setPath("target/test-classes") - testConfig.setSource(source); - val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterFollowingPipeline/followers").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration] - destination.setPath("target/test-classes") - testConfig.setDestination(destination) - testConfig.setProviderWaitMs(1000l) - testConfig.setTest(true) + val reference: Config = ConfigFactory.load() + val conf_file: File = new File("target/test-classes/FlinkTwitterFollowingPipelineFriendsIT.conf") + assert(conf_file.exists()) + val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + val typesafe: Config = testResourceConfig.withFallback(reference).resolve() + val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) + val testConfig = new ComponentConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) + + setup(testConfig) val job = new FlinkTwitterFollowingPipeline(config = testConfig) val jobThread = new Thread(job) jobThread.start jobThread.join - eventually (timeout(30 seconds), interval(1 seconds)) { - assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterFollowingPipeline/followers"))) + eventually (timeout(60 seconds), interval(1 seconds)) { + assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) assert( - Source.fromFile("target/test-classes/FlinkTwitterFollowingPipeline/followers", "UTF-8").getLines.size + Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size > 500) } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala index 8a942e5..a355696 100644 --- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala +++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala @@ -1,12 +1,16 @@ package com.peoplepattern.streams.twitter.collection +import java.io.File import java.nio.file.{Files, Paths} import java.util.concurrent.TimeUnit import com.google.common.util.concurrent.{Monitor, Uninterruptibles} -import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} +import org.apache.streams.examples.flink.twitter.{TwitterFollowingPipelineConfiguration, TwitterPostsPipelineConfiguration} +import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration +import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._ +import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterPostsPipeline, FlinkTwitterUserInformationPipeline} import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration} import org.slf4j.{Logger, LoggerFactory} @@ -20,23 +24,25 @@ import org.testng.annotations.Test /** * Created by sblackmon on 3/13/16. */ -class FlinkTwitterPostsPipelineIT extends FlatSpec { +class FlinkTwitterPostsPipelineIT extends FlatSpec { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT]) + import FlinkTwitterPostsPipeline._ + @Test def flinkTwitterPostsPipelineIT = { - val testConfig : TwitterPostsPipelineConfiguration = - new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig) - val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("asf.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration] - source.setPath("target/test-classes") - testConfig.setSource(source); - val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/FlinkTwitterPostsPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration] - destination.setPath("target/test-classes") - testConfig.setDestination(destination) - testConfig.setProviderWaitMs(1000l) - testConfig.setTest(true) + val reference: Config = ConfigFactory.load() + val conf_file: File = new File("target/test-classes/FlinkTwitterPostsPipelineIT.conf") + assert(conf_file.exists()) + val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + val typesafe: Config = testResourceConfig.withFallback(reference).resolve() + val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) + val testConfig = new ComponentConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe) + + setup(testConfig) val job = new FlinkTwitterPostsPipeline(config = testConfig) val jobThread = new Thread(job) @@ -44,9 +50,9 @@ class FlinkTwitterPostsPipelineIT extends FlatSpec { jobThread.join eventually (timeout(30 seconds), interval(1 seconds)) { - assert(Files.exists(Paths.get("target/test-classes/FlinkTwitterPostsPipeline"))) + assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) assert( - Source.fromFile("target/test-classes/FlinkTwitterPostsPipeline", "UTF-8").getLines.size + Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size >= 200) } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala new file mode 100644 index 0000000..f083f65 --- /dev/null +++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterSpritzerPipelineIT.scala @@ -0,0 +1,57 @@ +package org.apache.streams.examples.flink.twitter.test + +import java.io.File +import java.nio.file.{Files, Paths} + +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} +import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline._ +import org.apache.streams.examples.flink.twitter.collection.{FlinkTwitterSpritzerPipeline, FlinkTwitterUserInformationPipeline} +import org.apache.streams.examples.flink.twitter.{TwitterPostsPipelineConfiguration, TwitterSpritzerPipelineConfiguration} +import org.slf4j.{Logger, LoggerFactory} +import org.testng.annotations.Test + +import scala.io.Source +import org.scalatest.FlatSpec +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.{Seconds, Span} +import org.scalatest.time.SpanSugar._ + +/** + * Created by sblackmon on 3/13/16. + */ +class FlinkTwitterSpritzerPipelineIT extends FlatSpec { + + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterSpritzerPipelineIT]) + + import FlinkTwitterSpritzerPipeline._ + + @Test(enabled = false) + def flinkTwitterSpritzerPipelineIT = { + + val reference: Config = ConfigFactory.load() + val conf_file: File = new File("target/test-classes/FlinkTwitterSpritzerPipelineIT.conf") + assert(conf_file.exists()) + val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + val typesafe: Config = testResourceConfig.withFallback(reference).resolve() + val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) + val testConfig = new ComponentConfigurator(classOf[TwitterSpritzerPipelineConfiguration]).detectConfiguration(typesafe) + + setup(testConfig) + + val job = new FlinkTwitterSpritzerPipeline(config = testConfig) + val jobThread = new Thread(job) + jobThread.start + jobThread.join + + eventually (timeout(30 seconds), interval(1 seconds)) { + assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) + assert( + Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size + >= 200) + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala index 3d21244..2ca8650 100644 --- a/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala +++ b/flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala @@ -1,10 +1,13 @@ package com.peoplepattern.streams.twitter.collection +import java.io.File import java.nio.file.{Files, Paths} -import com.peoplepattern.streams.pipelines.pdb.{TwitterPostsPipelineConfiguration, TwitterUserInformationPipelineConfiguration} +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} +import org.apache.streams.examples.flink.twitter.{TwitterSpritzerPipelineConfiguration, TwitterUserInformationPipelineConfiguration} import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.config.{ComponentConfigurator, StreamsConfiguration, StreamsConfigurator} +import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration} import org.scalatest.FlatSpec import org.scalatest._ @@ -25,19 +28,21 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT]) + import FlinkTwitterUserInformationPipeline._ + @Test def flinkTwitterUserInformationPipelineIT = { - val testConfig : TwitterUserInformationPipelineConfiguration = - new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig) - val source : HdfsReaderConfiguration = new HdfsReaderConfiguration().withReaderPath("1000twitterids.txt").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsReaderConfiguration] - source.setPath("target/test-classes") - testConfig.setSource(source); - val destination : HdfsWriterConfiguration = new HdfsWriterConfiguration().withWriterPath("pdb-twitter-collect/TwitterUserInformationPipeline").withScheme(HdfsConfiguration.Scheme.FILE).asInstanceOf[HdfsWriterConfiguration] - destination.setPath("target/test-classes") - testConfig.setDestination(destination) - testConfig.setProviderWaitMs(1000l) - testConfig.setTest(true) + val reference: Config = ConfigFactory.load() + val conf_file: File = new File("target/test-classes/FlinkTwitterUserInformationPipelineIT.conf") + assert(conf_file.exists()) + val testResourceConfig: Config = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + val typesafe: Config = testResourceConfig.withFallback(reference).resolve() + val streams: StreamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe) + val testConfig = new ComponentConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe) + + setup(testConfig) val job = new FlinkTwitterUserInformationPipeline(config = testConfig) val jobThread = new Thread(job) @@ -45,9 +50,9 @@ class FlinkTwitterUserInformationPipelineIT extends FlatSpec { jobThread.join eventually (timeout(30 seconds), interval(1 seconds)) { - assert(Files.exists(Paths.get("target/test-classes/TwitterUserInformationPipeline"))) + assert(Files.exists(Paths.get(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath))) assert( - Source.fromFile("target/test-classes/TwitterUserInformationPipeline", "UTF-8").getLines.size + Source.fromFile(testConfig.getDestination.getPath + "/" + testConfig.getDestination.getWriterPath, "UTF-8").getLines.size > 500) } http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index 7054e89..6c50ca2 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -41,7 +41,4 @@ <module>flink-twitter-collection</module> </modules> - <build> - - </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/local/elasticsearch-hdfs/pom.xml b/local/elasticsearch-hdfs/pom.xml index 7b653fc..52cd0fc 100644 --- a/local/elasticsearch-hdfs/pom.xml +++ b/local/elasticsearch-hdfs/pom.xml @@ -67,7 +67,7 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>com.typesafe</groupId> @@ -76,34 +76,34 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-util</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <type>test-jar</type> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-pojo</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <type>test-jar</type> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-runtime-local</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-elasticsearch</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-hdfs</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/elasticsearch-reindex/pom.xml ---------------------------------------------------------------------- diff --git a/local/elasticsearch-reindex/pom.xml b/local/elasticsearch-reindex/pom.xml index e81cbe2..325e564 100644 --- a/local/elasticsearch-reindex/pom.xml +++ b/local/elasticsearch-reindex/pom.xml @@ -92,7 +92,7 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-elasticsearch</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/mongo-elasticsearch-sync/pom.xml ---------------------------------------------------------------------- diff --git a/local/mongo-elasticsearch-sync/pom.xml b/local/mongo-elasticsearch-sync/pom.xml index 318c47e..d268ed7 100644 --- a/local/mongo-elasticsearch-sync/pom.xml +++ b/local/mongo-elasticsearch-sync/pom.xml @@ -66,7 +66,7 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>com.typesafe</groupId> @@ -75,27 +75,27 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-runtime-local</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-elasticsearch</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-mongo</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-pojo</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <type>test-jar</type> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-follow-graph/pom.xml ---------------------------------------------------------------------- diff --git a/local/twitter-follow-graph/pom.xml b/local/twitter-follow-graph/pom.xml index d40adde..9bf980d 100644 --- a/local/twitter-follow-graph/pom.xml +++ b/local/twitter-follow-graph/pom.xml @@ -49,17 +49,17 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-runtime-local</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-twitter</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <exclusions> <exclusion> <groupId>commons-logging</groupId> @@ -70,12 +70,12 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-graph</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-pojo</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <type>test-jar</type> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-history-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/local/twitter-history-elasticsearch/pom.xml b/local/twitter-history-elasticsearch/pom.xml index afc8cf0..ba6dbe7 100644 --- a/local/twitter-history-elasticsearch/pom.xml +++ b/local/twitter-history-elasticsearch/pom.xml @@ -69,7 +69,7 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>com.typesafe</groupId> @@ -78,29 +78,29 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-util</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <type>test-jar</type> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-pojo</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <type>test-jar</type> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-runtime-local</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-twitter</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <exclusions> <exclusion> <groupId>commons-logging</groupId> @@ -111,7 +111,7 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-elasticsearch</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/0112a838/local/twitter-userstream-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/local/twitter-userstream-elasticsearch/pom.xml b/local/twitter-userstream-elasticsearch/pom.xml index 224bdd4..1b7b64f 100644 --- a/local/twitter-userstream-elasticsearch/pom.xml +++ b/local/twitter-userstream-elasticsearch/pom.xml @@ -67,7 +67,7 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>com.typesafe</groupId> @@ -76,32 +76,32 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-runtime-local</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-filters</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-provider-twitter</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-persist-elasticsearch</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-pojo</artifactId> - <version>0.3-incubating</version> + <version>0.4-incubating-SNAPSHOT</version> <type>test-jar</type> </dependency> <dependency>
