Re: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

2017-05-12 Thread Michael Armbrust
I believe that Avro/Kafka messages have a few bytes at the beginning of the
message to denote which schema is being used.  Have you tried using
the KafkaAvroDecoder inside of the map instead?

On Fri, May 12, 2017 at 9:26 AM, Revin Chalil  wrote:

> Just following up on this; would appreciate any responses on this. Thanks.
>
>
>
> *From:* Revin Chalil [mailto:rcha...@expedia.com]
> *Sent:* Wednesday, May 10, 2017 11:21 PM
> *To:* user@spark.apache.org
> *Subject:* Reading Avro messages from Kafka using Structured Streaming in
> Spark 2.1
>
>
>
> I am trying to convert avro records with field type = bytes to json string 
> using Structured Streaming in Spark 2.1. Please see below.
>
>
>
> *object *AvroConvert {
>
>   *case class *KafkaMessage(
>payload: String
>  )
>
>   *val **schemaString *="""{
> "type" : "record",
> "name" : "HdfsEvent",
> "namespace" : "com.abc.def.domain.hdfs",
> "fields" : [ {
>   "name" : "payload",
>   "type" : {
> "type" : "bytes",
> "java-class" : "[B"
>   }
> } ]
>   }"""
>   *val **messageSchema *= *new *Schema.Parser().parse(*schemaString*)
>   *val **reader *= *new *GenericDatumReader[GenericRecord](*messageSchema*)
>   // Binary decoder
>   *val **decoder *= DecoderFactory.*get*()
>   // Register implicit encoder for map operation
>   *implicit val **encoder*: Encoder[GenericRecord] = 
> org.apache.spark.sql.Encoders.*kryo*[GenericRecord]
>
>   *def *main(args: Array[String]) {
>
> *val *KafkaBroker = "**.**.**.**:9092";
> *val *InTopic = "avro";
>
> // Get Spark session
> *val *session = SparkSession
>   .
> *builder  *.master("local[*]")
>   .appName("myapp")
>   .getOrCreate()
>
> // Load streaming data
> *import *session.implicits._
>
> *val *data = session
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KafkaBroker)
>   .option("subscribe", InTopic)
>   .load()
>   .select($"value".as[Array[Byte]])
>   .map(d => {
> *val *rec = *reader*.read(*null*, *decoder*.binaryDecoder(d, *null*))
> *val *payload = rec.get("payload").asInstanceOf[Byte].toString
> *new *KafkaMessage(payload)
>   })
>
> *val *query = data.writeStream
>   .outputMode("Append")
>   .format("console")
>   .start()
>
> query.awaitTermination()
>   }
> }
>
>
>
>
>
> I am getting the below error.
>
> org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
>
> at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>
> at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
>
> at 
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
>
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>
> at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>
> at 
> com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)
>
> at 
> com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)
>
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
>
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
>
> at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>
> at 

RE: Reading Avro messages from Kafka using Structured Streaming in Spark 2.1

2017-05-12 Thread Revin Chalil
Just following up on this; would appreciate any responses on this. Thanks.

From: Revin Chalil [mailto:rcha...@expedia.com]
Sent: Wednesday, May 10, 2017 11:21 PM
To: user@spark.apache.org
Subject: Reading Avro messages from Kafka using Structured Streaming in Spark 
2.1


I am trying to convert avro records with field type = bytes to json string 
using Structured Streaming in Spark 2.1. Please see below.


object AvroConvert {

  case class KafkaMessage(
   payload: String
 )

  val schemaString ="""{
"type" : "record",
"name" : "HdfsEvent",
"namespace" : "com.abc.def.domain.hdfs",
"fields" : [ {
  "name" : "payload",
  "type" : {
"type" : "bytes",
"java-class" : "[B"
  }
} ]
  }"""
  val messageSchema = new Schema.Parser().parse(schemaString)
  val reader = new GenericDatumReader[GenericRecord](messageSchema)
  // Binary decoder
  val decoder = DecoderFactory.get()
  // Register implicit encoder for map operation
  implicit val encoder: Encoder[GenericRecord] = 
org.apache.spark.sql.Encoders.kryo[GenericRecord]

  def main(args: Array[String]) {

val KafkaBroker = "**.**.**.**:9092";
val InTopic = "avro";

// Get Spark session
val session = SparkSession
  .builder
  .master("local[*]")
  .appName("myapp")
  .getOrCreate()

// Load streaming data
import session.implicits._

val data = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KafkaBroker)
  .option("subscribe", InTopic)
  .load()
  .select($"value".as[Array[Byte]])
  .map(d => {
val rec = reader.read(null, decoder.binaryDecoder(d, null))
val payload = rec.get("payload").asInstanceOf[Byte].toString
new KafkaMessage(payload)
  })

val query = data.writeStream
  .outputMode("Append")
  .format("console")
  .start()

query.awaitTermination()
  }
}


I am getting the below error.

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40

at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)

at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)

at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)

at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)

at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)

at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)

at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)

at 
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)

at 
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)

at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)

at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)

at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:99)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


I read suggestions to use DataFileReader instead of binaryDecoder as below but 
was was not successful using this in scala.


DatumReader datumReader = new 
SpecificDatumReader(schema);