[BAHIR-117] Expand filtering options for TwitterInputDStream Adds a new method to TwitterUtils that enables users to pass an arbitrary FilterQuery down to the TwitterReceiver.
This enables use-cases like receiving Tweets based on location, based on handle, etc. Previously users were only able to receive Tweets based on disjunctive keyword queries. Closes #43. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/86ded930 Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/86ded930 Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/86ded930 Branch: refs/heads/master Commit: 86ded930e4af769e8191c8f415fe48193dd4914b Parents: 68ed2d4 Author: Clemens Wolff <[email protected]> Authored: Thu May 4 12:52:54 2017 -0700 Committer: Luciano Resende <[email protected]> Committed: Wed May 24 09:42:53 2017 -0700 ---------------------------------------------------------------------- .../streaming/akka/JavaActorWordCount.java | 2 +- .../streaming/twitter/TwitterLocations.scala | 92 ++++++++++++++++++++ .../streaming/twitter/TwitterInputDStream.scala | 12 ++- .../spark/streaming/twitter/TwitterUtils.scala | 46 +++++++++- .../twitter/JavaTwitterStreamSuite.java | 4 + .../streaming/twitter/TwitterStreamSuite.scala | 5 +- 6 files changed, 150 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java ---------------------------------------------------------------------- diff --git a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java index 740f9f8..abc1f70 100644 --- a/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java +++ b/streaming-akka/examples/src/main/java/org/apache/spark/examples/streaming/akka/JavaActorWordCount.java @@ -49,7 +49,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver { private final String urlOfPublisher; - public JavaSampleActorReceiver(String urlOfPublisher) { + JavaSampleActorReceiver(String urlOfPublisher) { this.urlOfPublisher = urlOfPublisher; } http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala ---------------------------------------------------------------------- diff --git a/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala new file mode 100644 index 0000000..00859fe --- /dev/null +++ b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterLocations.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.streaming.twitter + +import org.apache.log4j.{Level, Logger} +import twitter4j.FilterQuery + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.twitter._ + +/** + * Illustrates the use of custom filter queries to get Tweets from one or more locations. + */ +object TwitterLocations { + def main(args: Array[String]) { + if (args.length < 4 || args.length % 4 != 0) { + System.err.println("Usage: TwitterLocations <consumer key> <consumer secret> " + + "<access token> <access token secret> " + + "[<latitude-south-west> <longitude-south-west>" + + " <latitude-north-east> <longitude-north-east> ...]") + System.exit(1) + } + + // Set logging level if log4j not configured (override by adding log4j.properties to classpath) + if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) { + Logger.getRootLogger.setLevel(Level.WARN) + } + + // Set the system properties so that Twitter4j library used by twitter stream + // can use them to generate OAuth credentials + val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) + System.setProperty("twitter4j.oauth.consumerKey", consumerKey) + System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) + System.setProperty("twitter4j.oauth.accessToken", accessToken) + System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) + + // Get bounding boxes of locations for which to retrieve Tweets from command line + val locationArgs = args.takeRight(args.length - 4) + val boundingBoxes = if (locationArgs.length == 0) { + System.out.println("No location bounding boxes specified, using defaults for New York City") + val nycSouthWest = Array(-74.0, 40.0) + val nycNorthEast = Array(-73.0, 41.0) + Array(nycSouthWest, nycNorthEast) + } else { + locationArgs.map(_.toDouble).sliding(2, 2).toArray + } + + val sparkConf = new SparkConf().setAppName("TwitterLocations") + + // check Spark configuration for master URL, set it to local if not configured + if (!sparkConf.contains("spark.master")) { + sparkConf.setMaster("local[2]") + } + + val ssc = new StreamingContext(sparkConf, Seconds(2)) + val locationsQuery = new FilterQuery().locations(boundingBoxes : _*) + + // Print Tweets from the specified coordinates + // This includes Tweets geo-tagged in the bounding box defined by the coordinates + // As well as Tweets tagged in places inside of the bounding box + TwitterUtils.createFilteredStream(ssc, None, Some(locationsQuery)) + .map(tweet => { + val latitude = Option(tweet.getGeoLocation).map(l => s"${l.getLatitude},${l.getLongitude}") + val place = Option(tweet.getPlace).map(_.getName) + val location = latitude.getOrElse(place.getOrElse("(no location)")) + val text = tweet.getText.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ') + s"$location\t$text" + }) + .print() + + ssc.start() + ssc.awaitTermination() + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index bd23a12..81ce60d 100644 --- a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -41,7 +41,7 @@ private[streaming] class TwitterInputDStream( _ssc: StreamingContext, twitterAuth: Option[Authorization], - filters: Seq[String], + query: Option[FilterQuery], storageLevel: StorageLevel ) extends ReceiverInputDStream[Status](_ssc) { @@ -52,14 +52,14 @@ class TwitterInputDStream( private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) override def getReceiver(): Receiver[Status] = { - new TwitterReceiver(authorization, filters, storageLevel) + new TwitterReceiver(authorization, query, storageLevel) } } private[streaming] class TwitterReceiver( twitterAuth: Authorization, - filters: Seq[String], + query: Option[FilterQuery], storageLevel: StorageLevel ) extends Receiver[Status](storageLevel) with Logging { @@ -85,10 +85,8 @@ class TwitterReceiver( } }) - val query = new FilterQuery - if (filters.size > 0) { - query.track(filters.mkString(",")) - newTwitterStream.filter(query) + if (query.isDefined) { + newTwitterStream.filter(query.get) } else { newTwitterStream.sample() } http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala ---------------------------------------------------------------------- diff --git a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index 9cb0106..b0e9b78 100644 --- a/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/streaming-twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.twitter -import twitter4j.Status +import twitter4j.{FilterQuery, Status} import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel @@ -33,6 +33,25 @@ object TwitterUtils { * authorization; this uses the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret + * @param query A query to get only those tweets that match it + * @param storageLevel Storage level to use for storing the received objects + */ + def createFilteredStream( + ssc: StreamingContext, + twitterAuth: Option[Authorization], + query: Option[FilterQuery] = None, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[Status] = { + new TwitterInputDStream(ssc, twitterAuth, query, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param ssc StreamingContext object + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret * @param filters Set of filter strings to get only those tweets that match them * @param storageLevel Storage level to use for storing the received objects */ @@ -42,7 +61,11 @@ object TwitterUtils { filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[Status] = { - new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) + val query = if (filters.nonEmpty) { + Some(new FilterQuery().track(filters.mkString(","))) + } else None + + createFilteredStream(ssc, twitterAuth, query, storageLevel) } /** @@ -129,4 +152,23 @@ object TwitterUtils { ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param jssc JavaStreamingContext object + * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth + * authorization; this uses the system properties twitter4j.oauth.consumerKey, + * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and + * twitter4j.oauth.accessTokenSecret + * @param query A query to get only those tweets that match it + * @param storageLevel Storage level to use for storing the received objects + */ + def createFilteredStream( + jssc: JavaStreamingContext, + twitterAuth: Authorization, + query: FilterQuery, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[Status] = { + createFilteredStream(jssc.ssc, Some(twitterAuth), Some(query), storageLevel) + } } http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java ---------------------------------------------------------------------- diff --git a/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java index 26ec8af..e22e24e 100644 --- a/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java +++ b/streaming-twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java @@ -18,6 +18,7 @@ package org.apache.spark.streaming.twitter; import org.junit.Test; +import twitter4j.FilterQuery; import twitter4j.Status; import twitter4j.auth.Authorization; import twitter4j.auth.NullAuthorization; @@ -30,6 +31,7 @@ public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { public void testTwitterStream() { String[] filters = { "filter1", "filter2" }; Authorization auth = NullAuthorization.getInstance(); + FilterQuery query = new FilterQuery().language("en,es"); // tests the API, does not actually test data receiving JavaDStream<Status> test1 = TwitterUtils.createStream(ssc); @@ -40,5 +42,7 @@ public class JavaTwitterStreamSuite extends LocalJavaStreamingContext { JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters); JavaDStream<Status> test6 = TwitterUtils.createStream(ssc, auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<Status> test7 = TwitterUtils.createFilteredStream(ssc, + auth, query, StorageLevel.MEMORY_AND_DISK_SER_2()); } } http://git-wip-us.apache.org/repos/asf/bahir/blob/86ded930/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala ---------------------------------------------------------------------- diff --git a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index bd23831..3f1babd 100644 --- a/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/streaming-twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.twitter import org.scalatest.BeforeAndAfter -import twitter4j.Status +import twitter4j.{FilterQuery, Status} import twitter4j.auth.{Authorization, NullAuthorization} import org.apache.spark.SparkFunSuite @@ -38,6 +38,7 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging test("twitter input stream") { val ssc = new StreamingContext(master, framework, batchDuration) val filters = Seq("filter1", "filter2") + val query = new FilterQuery().language("fr,es") val authorization: Authorization = NullAuthorization.getInstance() // tests the API, does not actually test data receiving @@ -52,6 +53,8 @@ class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging TwitterUtils.createStream(ssc, Some(authorization), filters) val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream( ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test7: ReceiverInputDStream[Status] = TwitterUtils.createFilteredStream( + ssc, Some(authorization), Some(query), StorageLevel.MEMORY_AND_DISK_SER_2) // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream
