[
https://issues.apache.org/jira/browse/SPARK-13065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15129802#comment-15129802
]
sachin aggarwal commented on SPARK-13065:
-----------------------------------------
happy to see, thats exactly what I have added have a look at this file to see
how to use new API for java use case :-
https://github.com/agsachin/spark/blob/SPARK-13065/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
and for scala check this out
https://github.com/agsachin/spark/blob/SPARK-13065/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
> streaming-twitter pass twitter4j.FilterQuery argument to
> TwitterUtils.createStream()
> ------------------------------------------------------------------------------------
>
> Key: SPARK-13065
> URL: https://issues.apache.org/jira/browse/SPARK-13065
> Project: Spark
> Issue Type: Improvement
> Components: Streaming
> Affects Versions: 1.6.0
> Environment: all
> Reporter: Andrew Davidson
> Priority: Minor
> Labels: twitter
> Attachments: twitterFilterQueryPatch.tar.gz
>
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> The twitter stream api is very powerful provides a lot of support for
> twitter.com side filtering of status objects. When ever possible we want to
> let twitter do as much work as possible for us.
> currently the spark twitter api only allows you to configure a small sub set
> of possible filters
> String{} filters = {"tag1", tag2"}
> JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth,
> filters);
> The current implemenation does
> private[streaming]
> class TwitterReceiver(
> twitterAuth: Authorization,
> filters: Seq[String],
> storageLevel: StorageLevel
> ) extends Receiver[Status](storageLevel) with Logging {
> . . .
> val query = new FilterQuery
> if (filters.size > 0) {
> query.track(filters.mkString(","))
> newTwitterStream.filter(query)
> } else {
> newTwitterStream.sample()
> }
> ...
> rather than construct the FilterQuery object in TwitterReceiver.onStart(). we
> should be able to pass a FilterQueryObject
> looks like an easy fix. See source code links bellow
> kind regards
> Andy
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
> $$$$$$$$$ 2/2/16
> attached is my java implementation for this problem. Feel free to reuse it
> how ever you like. In my streaming spark app main() I have the following code
> FilterQuery query = config.getFilterQuery().fetch();
> if (query != null) {
> // TODO https://issues.apache.org/jira/browse/SPARK-13065
> tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth,
> query);
> } /*else
> spark native api
> String[] filters = {"tag1", tag2"}
> tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);
>
> see
> https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
>
> causes
> val query = new FilterQuery
> if (filters.size > 0) {
> query.track(filters.mkString(","))
> newTwitterStream.filter(query)
> } */
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]