Repository: incubator-gearpump Updated Branches: refs/heads/master ead442cba -> 504bcf39c
[GEARPUMP-377] Add TwitterSource and examples Author: manuzhang <[email protected]> Closes #247 from manuzhang/twitter_source. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/504bcf39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/504bcf39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/504bcf39 Branch: refs/heads/master Commit: 504bcf39cd7c64127d3290f56c644f07bf5dd7b5 Parents: ead442c Author: manuzhang <[email protected]> Authored: Tue May 8 10:22:32 2018 +0800 Committer: manuzhang <[email protected]> Committed: Tue May 8 10:22:36 2018 +0800 ---------------------------------------------------------------------- .../examples/twitter/TwitterExamples.scala | 73 ++++++++++ .../streaming/twitter/TwitterSource.scala | 135 +++++++++++++++++++ .../streaming/twitter/TwitterSourceSpec.scala | 64 +++++++++ project/BuildExamples.scala | 9 +- project/BuildExternals.scala | 12 ++ 5 files changed, 292 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala b/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala new file mode 100644 index 0000000..0b8722e --- /dev/null +++ b/examples/streaming/twitter/src/main/scala/org/apache/gearpump/streaming/examples/twitter/TwitterExamples.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.twitter + +import java.time.Duration + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp} +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindows} +import org.apache.gearpump.streaming.twitter.TwitterSource +import org.apache.gearpump.util.AkkaApp +import twitter4j.conf.ConfigurationBuilder + +object TwitterExamples extends AkkaApp with ArgumentsParser { + + val CONSUMER_KEY = "consumer-key" + val CONSUMER_SECRET = "consumer-secret" + val TOKEN = "token" + val TOKEN_SECRET = "token-secret" + + override val options: Array[(String, CLIOption[Any])] = Array( + CONSUMER_KEY -> CLIOption[String]("consumer key", required = true), + CONSUMER_SECRET -> CLIOption[String]("consumer secret", required = true), + TOKEN -> CLIOption[String]("token", required = true), + TOKEN_SECRET -> CLIOption[String]("token secret", required = true) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + val twitterConf = new ConfigurationBuilder() + .setOAuthConsumerKey(config.getString(CONSUMER_KEY)) + .setOAuthConsumerSecret(config.getString(CONSUMER_SECRET)) + .setOAuthAccessToken(config.getString(TOKEN)) + .setOAuthAccessTokenSecret(config.getString(TOKEN_SECRET)) + .build() + + val twitterSource = TwitterSource(twitterConf) + + val context: ClientContext = ClientContext(akkaConf) + val app = StreamApp("TwitterExample", context) + + app.source[String](twitterSource) + .flatMap(tweet => tweet.split("[\\s]+")) + .filter(_.startsWith("#")) + .map((_, 1)) + .window(FixedWindows.apply(Duration.ofMinutes(1)).triggering(EventTimeTrigger)) + .groupBy(_._1) + .sum + .sink(new LoggerSink) + + context.submit(app).waitUntilFinish() + context.close() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala ---------------------------------------------------------------------- diff --git a/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala b/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala new file mode 100644 index 0000000..9fe94ea --- /dev/null +++ b/external/twitter/src/main/scala/org/apache/gearpump/streaming/twitter/TwitterSource.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.twitter + +import java.time.Instant +import java.util.concurrent.LinkedBlockingQueue + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.streaming.twitter.TwitterSource.{Factory, MessageListener} +import twitter4j._ +import twitter4j.conf.Configuration + +class TwitterSource private[twitter]( + twitterFactory: Factory, + filterQuery: Option[FilterQuery], + statusListener: MessageListener +) extends DataSource { + + private var twitterStream: TwitterStream = _ + + /** + * Opens connection to data source + * invoked in onStart() method of [[org.apache.gearpump.streaming.source.DataSourceTask]] + * + * @param context is the task context at runtime + * @param startTime is the start time of system + */ + override def open(context: TaskContext, startTime: Instant): Unit = { + + this.twitterStream = twitterFactory.getTwitterStream + this.twitterStream.addListener(statusListener) + + filterQuery match { + case Some(query) => + this.twitterStream.filter(query) + case None => + this.twitterStream.sample() + } + } + + /** + * Reads next message from data source and + * returns null if no message is available + * + * @return a [[org.apache.gearpump.Message]] or null + */ + override def read(): Message = { + Option(statusListener.poll()).map(status => + Message(status.getText, Instant.now())).orNull + } + + /** + * Closes connection to data source. + * invoked in onStop() method of [[org.apache.gearpump.streaming.source.DataSourceTask]] + */ + override def close(): Unit = { + if (twitterStream != null) { + twitterStream.shutdown() + } + } + + /** + * Returns a watermark such that no timestamp earlier than the watermark should enter the system + * Watermark.MAX mark the end of source data + */ + override def getWatermark: Instant = { + Instant.now() + } +} + +object TwitterSource { + + class MessageListener extends StatusListener with Serializable { + + private val queue = new LinkedBlockingQueue[Status](100000) + + def poll(): Status = { + queue.poll() + } + + override def onStallWarning(warning: StallWarning): Unit = {} + + override def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice): Unit = {} + + override def onScrubGeo(userId: Long, upToStatusId: Long): Unit = {} + + override def onStatus(status: Status): Unit = { + queue.offer(status) + } + + override def onTrackLimitationNotice(numberOfLimitedStatuses: Int): Unit = {} + + override def onException(ex: Exception): Unit = { + throw ex + } + } + + /** + * Wrapper around TwitterStreamFactory which is final class and + * can not be mocked + */ + class Factory(factory: TwitterStreamFactory) extends Serializable { + + def getTwitterStream: TwitterStream = { + factory.getInstance() + } + } + + def apply(conf: Configuration): TwitterSource = { + new TwitterSource(new Factory(new TwitterStreamFactory(conf)), + None, new MessageListener) + } + + def apply(conf: Configuration, query: FilterQuery): TwitterSource = { + new TwitterSource(new Factory(new TwitterStreamFactory(conf)), + Option(query), new MessageListener) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala ---------------------------------------------------------------------- diff --git a/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala b/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala new file mode 100644 index 0000000..a7ac8fb --- /dev/null +++ b/external/twitter/src/main/test/scala/org/apache/gearpump/streaming/twitter/TwitterSourceSpec.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.twitter + +import java.time.Instant + +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.twitter.TwitterSource.{Factory, MessageListener} +import org.mockito.Mockito._ +import org.scalacheck.{Arbitrary, Gen} +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.prop.PropertyChecks +import twitter4j.{FilterQuery, TwitterStream} + +class TwitterSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + implicit val arbQuery: Arbitrary[Option[FilterQuery]] = Arbitrary { + Gen.oneOf(None, Some(new FilterQuery())) + } + + property("TwitterSource should properly setup, poll message and teardown") { + forAll { + (query: Option[FilterQuery], startTime: Long) => + val factory = mock[Factory] + val stream = mock[TwitterStream] + val listener = mock[MessageListener] + + when(factory.getTwitterStream).thenReturn(stream) + val twitterSource = new TwitterSource(factory, query, listener) + + twitterSource.open(MockUtil.mockTaskContext, Instant.ofEpochMilli(startTime)) + + verify(stream).addListener(listener) + query match { + case Some(q) => + verify(stream).filter(q) + case None => + verify(stream).sample() + } + + twitterSource.read() + verify(listener).poll() + + twitterSource.close() + verify(stream).shutdown() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/project/BuildExamples.scala ---------------------------------------------------------------------- diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala index 47aa0c6..7cc8807 100644 --- a/project/BuildExamples.scala +++ b/project/BuildExamples.scala @@ -37,7 +37,8 @@ object BuildExamples extends sbt.Build { wordcount, wordcountJava, example_hbase, - example_kudu + example_kudu, + example_twitter ) /** @@ -159,6 +160,12 @@ object BuildExamples extends sbt.Build { ).dependsOn(core % "provided", streaming % "provided; test->test", external_hadoopfs, external_monoid, external_serializer, external_kafka) + lazy val example_twitter = Project( + id = "gearpump-examples-twitter", + base = file("examples/streaming/twitter"), + settings = exampleSettings("org.apache.gearpump.streaming.examples.twitter.TwitterExamples") + ).dependsOn(core % "provided", streaming % "provided; test->test", external_twitter) + private def exampleSettings(className: String): Seq[Def.Setting[_]] = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( mainClass in(Compile, packageBin) := http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/504bcf39/project/BuildExternals.scala ---------------------------------------------------------------------- diff --git a/project/BuildExternals.scala b/project/BuildExternals.scala index 698af6c..6c1289c 100644 --- a/project/BuildExternals.scala +++ b/project/BuildExternals.scala @@ -134,4 +134,16 @@ object BuildExternals extends sbt.Build { )) .dependsOn(core % "provided", streaming % "test->test; provided") .disablePlugins(sbtassembly.AssemblyPlugin) + + lazy val external_twitter = Project( + id = "gearpump-external-twitter", + base = file("external/twitter"), + settings = commonSettings ++ javadocSettings ++ + Seq( + libraryDependencies ++= Seq( + "org.twitter4j" % "twitter4j-stream" % "4.0.4" + ) + )) + .dependsOn(core % "provided", streaming % "test->test; provided") + .disablePlugins(sbtassembly.AssemblyPlugin) } \ No newline at end of file
