zachtrong opened a new issue, #10217:
URL: https://github.com/apache/hudi/issues/10217

   **Context**
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   Yes
   
   **Problem description**
   
   Latest Apache Hudi release (0.14.0) are using 
kafka-avro-serializer-5.3.4.jar, which causes deserialization issues when apply 
with Kafka Avro datasource and confluent REST api version 6/7.
   
   Sample Avro schema:
   ```
   
{"id":36,"subject":"test-value","version":12,"schema":"{\"type\":\"record\",\"name\":\"test\",\"namespace\":\"test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"mongo.bot.test.test\"}","references":[]}
   ```
   Key error is **"references": []**.
   
   **Suggestion**
   Upgrade jar to io.confluent:kafka-avro-serializer:7.5.1 from confluent 
repository.
   
   Reference: 
[kafka-avro-serializer:7.5.1](https://github.com/confluentinc/schema-registry/blob/v7.5.1/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/SchemaString.java)
   
   **Expected behavior**
   
   Apache Hudi is able to parse the above Avro schema without error.
   
   **Environment Description**
   
   * Hudi version: 0.14.0
   
   * Spark version: 3.4.1
   
   * Hive version: 3.1.3
   
   * Hadoop version: 3.3.4
   
   * Storage (HDFS/S3/GCS..): S3
   
   * Running on Docker? (yes/no): yes
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Caused by: org.apache.kafka.common.errors.SerializationException: Error 
deserializing Avro message for id 25
   Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: 
Unrecognized field "references" (class 
io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString), not 
marked as ignorable (one known property: "schema"])
    at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); 
line: 1, column: 2063] (through reference chain: 
io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString["references"])
           at 
com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
           at 
com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1132)
           at 
com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2202)
           at 
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1705)
           at 
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1683)
           at 
com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:320)
           at 
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)
           at 
com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
           at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4730)
           at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3722)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:495)
           at 
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:488)
           at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:177)
           at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256)
           at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:235)
           at 
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107)
           at 
org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.deserialize(KafkaAvroSchemaDeserializer.java:79)
           at 
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
           at 
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
           at 
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
           at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1386)
           at 
org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:133)
           at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1617)
           at 
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1453)
           at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:686)
           at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:637)
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1303)
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
           at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
           at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:207)
           at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:136)
           at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:40)
           at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:39)
           at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:219)
           at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:257)
           at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:225)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
           at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
           at scala.collection.Iterator.foreach(Iterator.scala:943)
           at scala.collection.Iterator.foreach$(Iterator.scala:943)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
           at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
           at 
scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
           at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
           at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
           at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
           at scala.collection.AbstractIterator.to(Iterator.scala:1431)
           at 
scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
           at 
scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
           at 
scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
           at 
scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
           at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1462)
           at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
           at org.apache.spark.scheduler.Task.run(Task.scala:139)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
           at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to