[
https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dan Dutrow reopened SPARK-12103:
--------------------------------
Please update the Kafka word count examples to comment on what the key field
is. The current examples ignore the key without explanation.
> KafkaUtils createStream with multiple topics -- does not work as expected
> -------------------------------------------------------------------------
>
> Key: SPARK-12103
> URL: https://issues.apache.org/jira/browse/SPARK-12103
> Project: Spark
> Issue Type: Improvement
> Components: Documentation, Streaming
> Affects Versions: 1.4.1
> Reporter: Dan Dutrow
> Priority: Minor
> Fix For: 1.4.2
>
>
> (Note: yes, there is a Direct API that may be better, but it's not the
> easiest thing to get started with. The Kafka Receiver API still needs to
> work, especially for newcomers)
> When creating a receiver stream using KafkaUtils, there is a valid use case
> where you would want to use one (or a few) Kafka Streaming Receiver to pool
> resources. I have 10+ topics and don't want to dedicate 10 cores to
> processing all of them. However, when reading the data procuced by
> KafkaUtils.createStream, the DStream[(String,String)] does not properly
> insert the topic name into the tuple. The left-key always null, making it
> impossible to know what topic that data came from other than stashing your
> key into the value. Is there a way around that problem?
> //// CODE
> val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1,
> "topicE" -> 1, "topicF" -> 1, ...)
> val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map(
> i =>
> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, consumerProperties,
> topics,
> StorageLevel.MEMORY_ONLY_SER))
> val unioned :DStream[(String,String)] = ssc.union(streams)
> unioned.flatMap(x => {
> val (key, value) = x
> // key is always null!
> // value has data from any one of my topics
> key match ... {
> ......
> }
> }
> //// END CODE
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]