zachtrong opened a new issue, #10217: URL: https://github.com/apache/hudi/issues/10217
**Context** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes **Problem description** Latest Apache Hudi release (0.14.0) are using kafka-avro-serializer-5.3.4.jar, which causes deserialization issues when apply with Kafka Avro datasource and confluent REST api version 6/7. Sample Avro schema: ``` {"id":36,"subject":"test-value","version":12,"schema":"{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"mongo.bot.test.test\"}","references":[]} ``` Key error is **"references": []**. **Suggestion** Upgrade jar to io.confluent:kafka-avro-serializer:7.5.1 from confluent repository. Reference: [kafka-avro-serializer:7.5.1](https://github.com/confluentinc/schema-registry/blob/v7.5.1/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/SchemaString.java) **Expected behavior** Apache Hudi is able to parse the above Avro schema without error. **Environment Description** * Hudi version: 0.14.0 * Spark version: 3.4.1 * Hive version: 3.1.3 * Hadoop version: 3.3.4 * Storage (HDFS/S3/GCS..): S3 * Running on Docker? (yes/no): yes **Additional context** Add any other context about the problem here. **Stacktrace** ```Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 25 Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "references" (class io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString), not marked as ignorable (one known property: "schema"]) at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2063] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString["references"]) at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61) at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1132) at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2202) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1705) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1683) at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:320) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177) at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4730) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3722) at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265) at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:495) at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:488) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:177) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:235) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.deserialize(KafkaAvroSchemaDeserializer.java:79) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79) at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386) at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617) at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:207) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:136) at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40) at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:257) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:225) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at scala.collection.AbstractIterator.to(Iterator.scala:1431) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at scala.collection.AbstractIterator.toArray(Iterator.scala:1431) at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:139) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
