If you want an "easy" but not particularly performant way to do it,
each org.apache.kafka.clients.consumer.ConsumerRecord
has a topic.

The topic is going to be the same for the entire partition as long as you
haven't shuffled, hence the examples on how to deal with it at a partition
level.

On Fri, Sep 8, 2017 at 8:29 PM, Dan Dong <dongda...@gmail.com> wrote:

> Hi,Alonso.
>   Thanks! I've read about this but did not quite understand it. To pick
> out the topic name of a kafka message seems a simple task but the example
> code looks so complicated with redundent info. Why do we need offsetRanges
> here and do we have a easy way to achieve this?
>
> Cheers,
> Dan
>
>
> 2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman <alons...@gmail.com>:
>
>> Hi, reading the official doc
>> <http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>,
>> i think you can do it this way:
>>
>> import org.apache.spark.streaming.kafka._
>>
>>    val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>
>>       ssc, kafkaParams, topicsSet)
>>
>>
>>  // Hold a reference to the current offset ranges, so it can be used 
>> downstream
>>  var offsetRanges = Array.empty[OffsetRange]
>>
>>  directKafkaStream.transform { rdd =>
>>    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>    rdd
>>  }.map {
>>            ...
>>  }.foreachRDD { rdd =>
>>    for (o <- offsetRanges) {
>>      println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}")
>>    }
>>
>>  }
>>
>>
>> 2017-09-06 14:38 GMT+02:00 Dan Dong <dongda...@gmail.com>:
>>
>>> Hi, All,
>>>   I have one issue here about how to process multiple Kafka topics in a
>>> Spark 2.* program. My question is: How to get the topic name from a message
>>> received from Kafka? E.g:
>>>
>>> ......
>>>     val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](
>>>       ssc, kafkaParams, topicsSet)
>>>
>>>     // Get the lines, split them into words, count the words and print
>>>     val lines = messages.map(_._2)
>>>     val words = lines.flatMap(_.split(" "))
>>>     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>>>     wordCounts.print()
>>> ......
>>>
>>> Kafka send the messages in multiple topics through console producer for
>>> example. But when Spark receive the message, how it will know which topic
>>> is this piece of message coming from? Thanks a lot for any of your helps!
>>>
>>> Cheers,
>>> Dan
>>>
>>
>>
>>
>> --
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>
>
>

Reply via email to