If i understand correctly, i guess you are suggesting me to do this : val kafkaDStream = KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaConf, Set(topics))
kafkaDStream.map{ case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) } foreachRDD(rdd=>rdd.collect().map{ case(devId,genericEvent)=>{ println(genericEvent) } }) I read from Kafka as a Byte Array => applied a transformation on the byteArray to Custom Class => Printed the custom class for debugging purpose. But this is not helping me. i.e i am getting an empty object with default values when i printed "genericEvent" Please correct me if i did not get what you are suggesting me to try. On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote: > I guess what I'm asking is why not start with a Byte array like in the > example that works (using the DefaultDecoder) then map over it and do the > decoding manually like I'm suggesting below. > > Have you tried this approach? We have the same workflow (kafka => protobuf > => custom class) and it works. > If you expect invalid messages, you can use flatMap instead and wrap > .parseFrom in a Try {....} .toOption. > > Sent from my iPhone > > On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com> > wrote: > > @Adrian, > I am doing collect for debugging purpose. But i have to use foreachRDD so > that i can operate on top of this rdd and eventually save to DB. > > But my actual problem here is to properly convert Array[Byte] to my custom > object. > > On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote: > >> Why are you calling foreachRdd / collect in the first place? >> >> Instead of using a custom decoder, you should simply do – this is code >> executed on the workers and allows the computation to continue. ForeachRdd >> and collect are output operations and force the data to be collected on the >> driver (assuming you don’t want that…) >> >> val events = kafkaDStream.map { case(devId,byteArray)=> >> KafkaGenericEvent.parseFrom(byteArray) } >> >> From: srungarapu vamsi >> Date: Thursday, September 17, 2015 at 4:03 PM >> To: user >> Subject: Spark Streaming kafka directStream value decoder issue >> >> I am using KafkaUtils.createDirectStream to read the data from kafka bus. >> >> On the producer end, i am generating in the following way: >> >> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) >> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >> "org.apache.kafka.common.serialization.StringSerializer") >> val producer = new KafkaProducer[String, KafkaGenericEvent](props) >> >> // Send some messages >> println("Sending message") >> val kafkaGenericEvent = new >> KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L) >> val message = new ProducerRecord[String, >> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent) >> producer.send(message) >> } >> >> I am connecting to kafka using the console consumer script and am able to >> see proper data. The KafkaGenericEvent used in the above code is the class >> generated using ScalaBuff from a protobuff file. >> >> On the consumer end, >> If i read the value as a normal byte array and the convert it into >> KafkaGenericEvent in the following way, i get proper data: >> >> val kafkaDStream = >> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >> kafkaConf, Set(topics)) >> >> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{ >> case(devId,byteArray)=>{ >> println(KafkaGenericEvent.parseFrom(byteArray)) >> } >> }) >> >> But if change the value to KafkaGenericEvent and use a custom decoder >> like this: >> >> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends >> Decoder[KafkaGenericEvent]{ >> override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = { >> KafkaGenericEvent.parseFrom(bytes) >> } >> } >> >> and in consumer: >> >> val kafkaDStream = >> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc, >> kafkaConf, Set(topics)) >> kafkaDStream foreachRDD(rdd=>rdd.collect().map{ >> case(devId,genericEvent)=>{ >> println(genericEvent) >> } >> }) >> >> Now, i my value object KafkaGenericEvent is not created based on the >> sent data instead it is creating an empty Object of KafkaGenericEvent with >> default values. >> >> Even if i read the value as array of bytes in the createDirectStream and >> than apply a transformation in the following way i am getting in correct >> values: >> >> val kafkaDStream = >> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, >> kafkaConf, Set(topics)) >> >> kafkaDStream.map{ >> case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray)) >> } foreachRDD(rdd=>rdd.collect().map{ >> case(devId,genericEvent)=>{ >> println(genericEvent) >> } >> }) >> >> I get the default KafkaGenericEvent Object in the line println >> (genericEvent) >> Does this mean that I can transform the values only on the driver and not >> on the executors? >> >> I am completely confused here! >> I am using : >> scala-2.10.4 >> spark-1.3.1 >> kafka_2.10-0.8.2.1 >> >> - >> /Vamsi >> > > > > -- > /Vamsi > > -- /Vamsi