sathyaprakashg opened a new issue #1971: URL: https://github.com/apache/hudi/issues/1971
Hi, I am facing below issues in deltastreamer due to schema evoluation in kafka. In schema registry, version 1 has 52 attributes. We added 2 additional attributes with default values in version 2. But, our producer is still sending events using version 1 schema id. ### Problem 1 I ran the deltastreamer with this config `hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/latest` and got below error ``` 20/08/16 12:35:05 ERROR io.HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=12345 partitionPath=Jan}, currentLocation='null', newLocation='null'} java.io.EOFException at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473) at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423) at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:119) at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70) at org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.<init>(LazyInsertIterable.java:92) at org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105) at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170) at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` I debugged hudi code and found that RDD generic record's schema has 52 fields only since producer is still producing using version 1. RDD' schema is used to convert avro bytest in HoodieAvroUtils.avroToBytes. But to convert them back from bytes to avro HoodieAvroUtils.bytesToAvro method is using schema provided in property file. Since we specified latest version in `hoodie.deltastreamer.schemaprovider.registry.url` and this schema is not same as one used to convert to bytes, we are getting above error. I noticed there is `targetUrl` property and i specified the version 1 and job started running fine. But problem is here when producer start using latest version to publish events, then we have to manually change the version in targetURL. My suggestion is whether we can use rdd schema for both HoodieAvroUtils.avroToBytes and HoodieAvroUtils.bytesToAvro, so that we can be sure same schema is used for both write and read? `hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/latest` `hoodie.deltastreamer.schemaprovider.registry.targetUrl=https://schema-registry.url.net/subjects/topicname-value/versions/1` ### Problem 2 After setting targetURL property as mentioned above, hudi delta streamer was working fine. We tried to add transformation by passing `--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer` ``` java.lang.ArrayIndexOutOfBoundsException: 52 at org.apache.avro.generic.GenericData$Record.get(GenericData.java:212) at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170) at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:69) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator.isEmpty(Iterator.scala:385) at scala.collection.Iterator.isEmpty$(Iterator.scala:385) at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429) at org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:47) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` This is because we have 54 converters in [AvroConversionHelper](https://github.com/apache/hudi/blob/4226d7514400d86761e39639e9554809b72b627c/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala#L170) as per latest schema but only 52 fields in RDD record. If we set `hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/1`, then it works fine. So, in general, is it possible to set latest schema in property file and make delta streamer without issue or even better if we don't need to specify schema registry url in property file, instead we can use schema from the rdd? **Environment Description** * Hudi version : 0.5.3 & 0.6.0 rc1 * Spark version : 2.4.4 * Running on Docker? (yes/no) : no ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
