[ 
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-12103.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 1.6.1
                   2.0.0

Issue resolved by pull request 10132
[https://github.com/apache/spark/pull/10132]

> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
>                 Key: SPARK-12103
>                 URL: https://issues.apache.org/jira/browse/SPARK-12103
>             Project: Spark
>          Issue Type: Improvement
>          Components: Documentation, Streaming
>    Affects Versions: 1.4.1
>            Reporter: Dan Dutrow
>            Priority: Minor
>             Fix For: 2.0.0, 1.6.1
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the 
> easiest thing to get started with. The Kafka Receiver API still needs to 
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case 
> where you would want to use one (or a few) Kafka Streaming Receiver to pool 
> resources. I have 10+ topics and don't want to dedicate 10 cores to 
> processing all of them. However, when reading the data procuced by 
> KafkaUtils.createStream, the DStream[(String,String)] does not properly 
> insert the topic name into the tuple. The left-key always null, making it 
> impossible to know what topic that data came from other than stashing your 
> key into the value.  Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, 
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( 
> i =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>     ssc, consumerProperties,
>     topics,
>     StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
>    val (key, value) = x
>   // key is always null!
>   // value has data from any one of my topics
>   key match ... {
>       ......
>   }
> }
> //// END CODE



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to