I believe that Avro/Kafka messages have a few bytes at the beginning of the
message to denote which schema is being used.  Have you tried using
the KafkaAvroDecoder inside of the map instead?

On Fri, May 12, 2017 at 9:26 AM, Revin Chalil <rcha...@expedia.com> wrote:

> Just following up on this; would appreciate any responses on this. Thanks.
>
>
>
> *From:* Revin Chalil [mailto:rcha...@expedia.com]
> *Sent:* Wednesday, May 10, 2017 11:21 PM
> *To:* user@spark.apache.org
> *Subject:* Reading Avro messages from Kafka using Structured Streaming in
> Spark 2.1
>
>
>
> I am trying to convert avro records with field type = bytes to json string 
> using Structured Streaming in Spark 2.1. Please see below.
>
>
>
> *object *AvroConvert {
>
>   *case class *KafkaMessage(
>                            payload: String
>                          )
>
>   *val **schemaString *=    """{
>                             "type" : "record",
>                             "name" : "HdfsEvent",
>                             "namespace" : "com.abc.def.domain.hdfs",
>                             "fields" : [ {
>                               "name" : "payload",
>                               "type" : {
>                                 "type" : "bytes",
>                                 "java-class" : "[B"
>                               }
>                             } ]
>                           }"""
>   *val **messageSchema *= *new *Schema.Parser().parse(*schemaString*)
>   *val **reader *= *new *GenericDatumReader[GenericRecord](*messageSchema*)
>   // Binary decoder
>   *val **decoder *= DecoderFactory.*get*()
>   // Register implicit encoder for map operation
>   *implicit val **encoder*: Encoder[GenericRecord] = 
> org.apache.spark.sql.Encoders.*kryo*[GenericRecord]
>
>   *def *main(args: Array[String]) {
>
>     *val *KafkaBroker = "**.**.**.**:9092";
>     *val *InTopic = "avro";
>
>     // Get Spark session
>     *val *session = SparkSession
>       .
> *builder      *.master("local[*]")
>       .appName("myapp")
>       .getOrCreate()
>
>     // Load streaming data
>     *import *session.implicits._
>
>     *val *data = session
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers", KafkaBroker)
>       .option("subscribe", InTopic)
>       .load()
>       .select($"value".as[Array[Byte]])
>       .map(d => {
>         *val *rec = *reader*.read(*null*, *decoder*.binaryDecoder(d, *null*))
>         *val *payload = rec.get("payload").asInstanceOf[Byte].toString
>         *new *KafkaMessage(payload)
>       })
>
>     *val *query = data.writeStream
>       .outputMode("Append")
>       .format("console")
>       .start()
>
>     query.awaitTermination()
>   }
> }
>
>
>
>
>
> I am getting the below error.
>
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
>
>     at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>
>     at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>
>     at 
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
>
>     at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
>
>     at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
>
>     at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
>
>     at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>
>     at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>
>     at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>
>     at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>
>     at 
> com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
>
>     at 
> com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
>
>     at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>
>     at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>
>     at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
>
>     at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>
>     at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
>     at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
>     at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>     at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> I read suggestions to use DataFileReader instead of binaryDecoder as below
> but was was not successful using this in scala.
>
>
>
> DatumReader<GenericRecord> datumReader = new 
> SpecificDatumReader<GenericRecord>(schema);
>
> DataFileStream<GenericRecord> dataFileReader = new 
> DataFileStream<GenericRecord>(inputStream, datumReader);
>
>
>
>
>
> Once the Byte type “payload” is converted to json, I plan write it back to
> another topic of kafka.
>
>
>
> Any help on this is much appreciated. Thank you!
>
>
>
> Revin
>
>
>
>
>
>
>

Reply via email to