If i understand correctly, i guess you are suggesting me to do this  :

val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

    kafkaDStream.map{
      case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
    } foreachRDD(rdd=>rdd.collect().map{
      case(devId,genericEvent)=>{
        println(genericEvent)
      }
    })

I read from Kafka as a Byte Array => applied a transformation on the
byteArray to Custom Class => Printed the custom class for debugging purpose.

But this is not helping me. i.e i am getting an empty object with default
values when i printed "genericEvent"

Please correct me if i did not get what you are suggesting me to try.


On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase <atan...@adobe.com> wrote:

> I guess what I'm asking is why not start with a Byte array like in the
> example that works (using the DefaultDecoder) then map over it and do the
> decoding manually like I'm suggesting below.
>
> Have you tried this approach? We have the same workflow (kafka => protobuf
> => custom class) and it works.
> If you expect invalid messages, you can use flatMap instead and wrap
> .parseFrom in a Try {....} .toOption.
>
> Sent from my iPhone
>
> On 17 Sep 2015, at 18:23, srungarapu vamsi <srungarapu1...@gmail.com>
> wrote:
>
> @Adrian,
> I am doing collect for debugging purpose. But i have to use foreachRDD so
> that i can operate on top of this rdd and eventually save to DB.
>
> But my actual problem here is to properly convert Array[Byte] to my custom
> object.
>
> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> Why are you calling foreachRdd / collect in the first place?
>>
>> Instead of using a custom decoder, you should simply do – this is code
>> executed on the workers and allows the computation to continue. ForeachRdd
>> and collect are output operations and force the data to be collected on the
>> driver (assuming you don’t want that…)
>>
>> val events = kafkaDStream.map { case(devId,byteArray)=> 
>> KafkaGenericEvent.parseFrom(byteArray) }
>>
>> From: srungarapu vamsi
>> Date: Thursday, September 17, 2015 at 4:03 PM
>> To: user
>> Subject: Spark Streaming kafka directStream value decoder issue
>>
>> I am using KafkaUtils.createDirectStream to read the data from kafka bus.
>>
>> On the producer end, i am generating in the following way:
>>
>>     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>>     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>       "org.apache.kafka.common.serialization.StringSerializer")
>>     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>       "org.apache.kafka.common.serialization.StringSerializer")
>>     val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>>
>>     // Send some messages
>>         println("Sending message")
>>         val kafkaGenericEvent = new 
>> KafkaGenericEvent("event-id",EventType.site,"6",1440500400000L)
>>         val message = new ProducerRecord[String, 
>> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
>>         producer.send(message)
>>       }
>>
>> I am connecting to kafka using the console consumer script and am able to
>> see proper data. The KafkaGenericEvent used in the above code is  the class
>> generated using ScalaBuff from a protobuff file.
>>
>> On the consumer end,
>> If i read the value as a normal byte array and the convert it into
>> KafkaGenericEvent in the following way, i get proper data:
>>
>>  val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>>     kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>>       case(devId,byteArray)=>{
>>         println(KafkaGenericEvent.parseFrom(byteArray))
>>       }
>>     })
>>
>> But if change the value to KafkaGenericEvent and use a custom decoder
>> like this:
>>
>> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
>> Decoder[KafkaGenericEvent]{
>>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>>    KafkaGenericEvent.parseFrom(bytes)
>>  }
>> }
>>
>> and in consumer:
>>
>>     val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
>>  kafkaConf, Set(topics))
>>     kafkaDStream foreachRDD(rdd=>rdd.collect().map{
>>       case(devId,genericEvent)=>{
>>         println(genericEvent)
>>       }
>>     })
>>
>> Now, i my value object KafkaGenericEvent   is not created based on the
>> sent data instead it is creating an empty Object of KafkaGenericEvent with
>> default values.
>>
>> Even if i read the value as array of bytes in the createDirectStream and
>> than apply a transformation in the following way i am getting in correct
>> values:
>>
>> val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>>     kafkaDStream.map{
>>       case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
>>     } foreachRDD(rdd=>rdd.collect().map{
>>       case(devId,genericEvent)=>{
>>         println(genericEvent)
>>       }
>>     })
>>
>> I get the default KafkaGenericEvent Object in the line println
>> (genericEvent)
>> Does this mean that I can transform the values only on the driver and not
>> on the executors?
>>
>> I am completely confused here!
>> I am using :
>>  scala-2.10.4
>>  spark-1.3.1
>>  kafka_2.10-0.8.2.1
>>
>> -
>> /Vamsi
>>
>
>
>
> --
> /Vamsi
>
>


-- 
/Vamsi

Reply via email to