Repository: bahir Updated Branches: refs/heads/master d9b430a0c -> a45bd8421
[BAHIR-65] Twitter integration test Closes #80 Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/a45bd842 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/a45bd842 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/a45bd842 Branch: refs/heads/master Commit: a45bd84210b8e68640f97dd328e7e7053c8276e6 Parents: d9b430a Author: Lukasz Antoniak <[email protected]> Authored: Thu Dec 20 10:39:24 2018 -0800 Committer: Luciano Resende <[email protected]> Committed: Wed Jan 9 16:11:58 2019 -0800 ---------------------------------------------------------------------- streaming-twitter/README.md | 16 ++++- .../streaming/twitter/TwitterStreamSuite.scala | 69 +++++++++++++++++--- 2 files changed, 74 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/a45bd842/streaming-twitter/README.md ---------------------------------------------------------------------- diff --git a/streaming-twitter/README.md b/streaming-twitter/README.md index 0b3d37a..4123ea9 100644 --- a/streaming-twitter/README.md +++ b/streaming-twitter/README.md @@ -45,4 +45,18 @@ can be provided by any of the [methods](http://twitter4j.org/en/configuration.ht You can also either get the public stream, or get the filtered stream based on keywords. -See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples) \ No newline at end of file +See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples). + +## Unit Test + +Executing integration tests requires users to register custom application at +[Twitter Developer Portal](https://developer.twitter.com) and obtain private OAuth credentials. +Below listing present how to run complete test suite on local workstation. + + cd streaming-twitter + env ENABLE_TWITTER_TESTS=1 \ + twitter4j.oauth.consumerKey=${customer key} \ + twitter4j.oauth.consumerSecret=${customer secret} \ + twitter4j.oauth.accessToken=${access token} \ + twitter4j.oauth.accessTokenSecret=${access token secret} \ + mvn clean test \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bahir/blob/a45bd842/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index 3f1babd..7d7886d 100644 --- a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -17,26 +17,40 @@ package org.apache.spark.streaming.twitter +import java.util.UUID + +import scala.collection.mutable + import org.scalatest.BeforeAndAfter -import twitter4j.{FilterQuery, Status} +import org.scalatest.concurrent.Eventually +import org.scalatest.time +import org.scalatest.time.Span +import twitter4j.{FilterQuery, Status, TwitterFactory} import twitter4j.auth.{Authorization, NullAuthorization} -import org.apache.spark.SparkFunSuite +import org.apache.spark.ConditionalSparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream -class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging { +class TwitterStreamSuite extends ConditionalSparkFunSuite + with Eventually with BeforeAndAfter with Logging { + def shouldRunTest(): Boolean = sys.env.get("ENABLE_TWITTER_TESTS").contains("1") - val batchDuration = Seconds(1) + var ssc: StreamingContext = _ - private val master: String = "local[2]" + before { + ssc = new StreamingContext("local[2]", this.getClass.getSimpleName, Seconds(1)) + } - private val framework: String = this.getClass.getSimpleName + after { + if (ssc != null) { + ssc.stop() + } + } test("twitter input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) val filters = Seq("filter1", "filter2") val query = new FilterQuery().language("fr,es") val authorization: Authorization = NullAuthorization.getInstance() @@ -55,9 +69,44 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) val test7: ReceiverInputDStream[Status] = TwitterUtils.createFilteredStream( ssc, Some(authorization), Some(query), StorageLevel.MEMORY_AND_DISK_SER_2) + } + + testIf("messages received", shouldRunTest) { + val userId = TwitterFactory.getSingleton.updateStatus( + UUID.randomUUID().toString + ).getUser.getId + + val receiveStream = TwitterUtils.createFilteredStream( + ssc, None, Some(new FilterQuery().follow(userId)) + ) + @volatile var receivedMessages: mutable.Set[Status] = mutable.Set() + receiveStream.foreachRDD { rdd => + for (element <- rdd.collect()) { + receivedMessages += element + } + receivedMessages + } + ssc.start() + + val nbOfMsg = 2 + var publishedMessages: List[String] = List() + + (1 to nbOfMsg).foreach( + _ => { + publishedMessages = UUID.randomUUID().toString :: publishedMessages + } + ) - // Note that actually testing the data receiving is hard as authentication keys are - // necessary for accessing Twitter live stream - ssc.stop() + eventually(timeout(Span(15, time.Seconds)), interval(Span(1000, time.Millis))) { + publishedMessages.foreach( + m => if (!receivedMessages.map(m => m.getText).contains(m.toString)) { + TwitterFactory.getSingleton.updateStatus(m) + } + ) + assert( + publishedMessages.map(m => m.toString).toSet + .subsetOf(receivedMessages.map(m => m.getText)) + ) + } } }
