Re: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1
I believe that Avro/Kafka messages have a few bytes at the beginning of the message to denote which schema is being used. Have you tried using the KafkaAvroDecoder inside of the map instead? On Fri, May 12, 2017 at 9:26 AM, Revin Chalilwrote: > Just following up on this; would appreciate any responses on this. Thanks. > > > > *From:* Revin Chalil [mailto:rcha...@expedia.com] > *Sent:* Wednesday, May 10, 2017 11:21 PM > *To:* user@spark.apache.org > *Subject:* Reading Avro messages from Kafka using Structured Streaming in > Spark 2.1 > > > > I am trying to convert avro records with field type = bytes to json string > using Structured Streaming in Spark 2.1. Please see below. > > > > *object *AvroConvert { > > *case class *KafkaMessage( >payload: String > ) > > *val **schemaString *="""{ > "type" : "record", > "name" : "HdfsEvent", > "namespace" : "com.abc.def.domain.hdfs", > "fields" : [ { > "name" : "payload", > "type" : { > "type" : "bytes", > "java-class" : "[B" > } > } ] > }""" > *val **messageSchema *= *new *Schema.Parser().parse(*schemaString*) > *val **reader *= *new *GenericDatumReader[GenericRecord](*messageSchema*) > // Binary decoder > *val **decoder *= DecoderFactory.*get*() > // Register implicit encoder for map operation > *implicit val **encoder*: Encoder[GenericRecord] = > org.apache.spark.sql.Encoders.*kryo*[GenericRecord] > > *def *main(args: Array[String]) { > > *val *KafkaBroker = "**.**.**.**:9092"; > *val *InTopic = "avro"; > > // Get Spark session > *val *session = SparkSession > . > *builder *.master("local[*]") > .appName("myapp") > .getOrCreate() > > // Load streaming data > *import *session.implicits._ > > *val *data = session > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", KafkaBroker) > .option("subscribe", InTopic) > .load() > .select($"value".as[Array[Byte]]) > .map(d => { > *val *rec = *reader*.read(*null*, *decoder*.binaryDecoder(d, *null*)) > *val *payload = rec.get("payload").asInstanceOf[Byte].toString > *new *KafkaMessage(payload) > }) > > *val *query = data.writeStream > .outputMode("Append") > .format("console") > .start() > > query.awaitTermination() > } > } > > > > > > I am getting the below error. > > org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40 > > at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) > > at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) > > at > org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) > > at > org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363) > > at > org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355) > > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) > > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > > at > com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99) > > at > com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98) > > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) > > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > > at
RE: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1
Just following up on this; would appreciate any responses on this. Thanks. From: Revin Chalil [mailto:rcha...@expedia.com] Sent: Wednesday, May 10, 2017 11:21 PM To: user@spark.apache.org Subject: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1 I am trying to convert avro records with field type = bytes to json string using Structured Streaming in Spark 2.1. Please see below. object AvroConvert { case class KafkaMessage( payload: String ) val schemaString ="""{ "type" : "record", "name" : "HdfsEvent", "namespace" : "com.abc.def.domain.hdfs", "fields" : [ { "name" : "payload", "type" : { "type" : "bytes", "java-class" : "[B" } } ] }""" val messageSchema = new Schema.Parser().parse(schemaString) val reader = new GenericDatumReader[GenericRecord](messageSchema) // Binary decoder val decoder = DecoderFactory.get() // Register implicit encoder for map operation implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord] def main(args: Array[String]) { val KafkaBroker = "**.**.**.**:9092"; val InTopic = "avro"; // Get Spark session val session = SparkSession .builder .master("local[*]") .appName("myapp") .getOrCreate() // Load streaming data import session.implicits._ val data = session .readStream .format("kafka") .option("kafka.bootstrap.servers", KafkaBroker) .option("subscribe", InTopic) .load() .select($"value".as[Array[Byte]]) .map(d => { val rec = reader.read(null, decoder.binaryDecoder(d, null)) val payload = rec.get("payload").asInstanceOf[Byte].toString new KafkaMessage(payload) }) val query = data.writeStream .outputMode("Append") .format("console") .start() query.awaitTermination() } } I am getting the below error. org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40 at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99) at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I read suggestions to use DataFileReader instead of binaryDecoder as below but was was not successful using this in scala. DatumReader datumReader = new SpecificDatumReader(schema);