[ https://issues.apache.org/jira/browse/SPARK-12103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dan Dutrow updated SPARK-12103: ------------------------------- Description: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? //// CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER)) val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { ...... } } //// END CODE was: (Note: yes, there is a Direct API that may be better, but it's not the easiest thing to get started with. The Kafka Receiver API still needs to work, especially for newcomers) When creating a receiver stream using KafkaUtils, there is a valid use case where you would want to pool resources. I have 10+ topics and don't want to dedicate 10 cores to processing all of them. However, when reading the data procuced by KafkaUtils.createStream, the DStream[(String,String)] does not properly insert the topic name into the tuple. The left-key always null, making it impossible to know what topic that data came from other than stashing your key into the value. Is there a way around that problem? //// CODE val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, "topicE" -> 1, "topicF" -> 1, ...) val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( i => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, consumerProperties, topics, StorageLevel.MEMORY_ONLY_SER(( val unioned :DStream[(String,String)] = ssc.union(streams) unioned.flatMap(x => { val (key, value) = x // key is always null! // value has data from any one of my topics key match ... { ...... } } //// END CODE > KafkaUtils createStream with multiple topics -- does not work as expected > ------------------------------------------------------------------------- > > Key: SPARK-12103 > URL: https://issues.apache.org/jira/browse/SPARK-12103 > Project: Spark > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Dan Dutrow > Fix For: 1.0.1 > > > (Note: yes, there is a Direct API that may be better, but it's not the > easiest thing to get started with. The Kafka Receiver API still needs to > work, especially for newcomers) > When creating a receiver stream using KafkaUtils, there is a valid use case > where you would want to pool resources. I have 10+ topics and don't want to > dedicate 10 cores to processing all of them. However, when reading the data > procuced by KafkaUtils.createStream, the DStream[(String,String)] does not > properly insert the topic name into the tuple. The left-key always null, > making it impossible to know what topic that data came from other than > stashing your key into the value. Is there a way around that problem? > //// CODE > val topics = Map("topicA" -> 1, "topicB" -> 1, "topicC" -> 1, "topicD" -> 1, > "topicE" -> 1, "topicF" -> 1, ...) > val streams : IndexedSeq[ReceiverInputDStream[(String,String] = (1 to 3).map( > i => > KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( > ssc, consumerProperties, > topics, > StorageLevel.MEMORY_ONLY_SER)) > val unioned :DStream[(String,String)] = ssc.union(streams) > unioned.flatMap(x => { > val (key, value) = x > // key is always null! > // value has data from any one of my topics > key match ... { > ...... > } > } > //// END CODE -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org