I am trying to convert avro records with field type = bytes to json string 
using Structured Streaming in Spark 2.1. Please see below.


object AvroConvert {

  case class KafkaMessage(
                           payload: String
                         )

  val schemaString =    """{
                            "type" : "record",
                            "name" : "HdfsEvent",
                            "namespace" : "com.abc.def.domain.hdfs",
                            "fields" : [ {
                              "name" : "payload",
                              "type" : {
                                "type" : "bytes",
                                "java-class" : "[B"
                              }
                            } ]
                          }"""
  val messageSchema = new Schema.Parser().parse(schemaString)
  val reader = new GenericDatumReader[GenericRecord](messageSchema)
  // Binary decoder
  val decoder = DecoderFactory.get()
  // Register implicit encoder for map operation
  implicit val encoder: Encoder[GenericRecord] = 
org.apache.spark.sql.Encoders.kryo[GenericRecord]

  def main(args: Array[String]) {

    val KafkaBroker = "**.**.**.**:9092";
    val InTopic = "avro";

    // Get Spark session
    val session = SparkSession
      .builder
      .master("local[*]")
      .appName("myapp")
      .getOrCreate()

    // Load streaming data
    import session.implicits._

    val data = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KafkaBroker)
      .option("subscribe", InTopic)
      .load()
      .select($"value".as[Array[Byte]])
      .map(d => {
        val rec = reader.read(null, decoder.binaryDecoder(d, null))
        val payload = rec.get("payload").asInstanceOf[Byte].toString
        new KafkaMessage(payload)
      })

    val query = data.writeStream
      .outputMode("Append")
      .format("console")
      .start()

    query.awaitTermination()
  }
}


I am getting the below error.

org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40

    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

    at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)

    at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)

    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)

    at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)

    at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)

    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)

    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)

    at 
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99)

    at 
com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98)

    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)

    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

    at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)

    at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)

    at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

    at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)

    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:99)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)

    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

    at java.lang.Thread.run(Thread.java:745)


I read suggestions to use DataFileReader instead of binaryDecoder as below but 
was was not successful using this in scala.


DatumReader<GenericRecord> datumReader = new 
SpecificDatumReader<GenericRecord>(schema);

DataFileStream<GenericRecord> dataFileReader = new 
DataFileStream<GenericRecord>(inputStream, datumReader);


Once the Byte type "payload" is converted to json, I plan write it back to 
another topic of kafka.

Any help on this is much appreciated. Thank you!

Revin



Reply via email to