sathyaprakashg opened a new issue #1971:
URL: https://github.com/apache/hudi/issues/1971


   Hi, I am facing below issues in deltastreamer due to schema evoluation in 
kafka. In schema registry, version 1 has 52 attributes. We added 2 additional 
attributes with default values in version 2. But, our producer is still sending 
events using version 1 schema id.
   
   ### Problem 1
   I ran the deltastreamer with this config 
`hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/latest`
 and got below error
   
   ```
   20/08/16 12:35:05 ERROR io.HoodieWriteHandle: Error writing record 
HoodieRecord{key=HoodieKey { recordKey=12345 partitionPath=Jan}, 
currentLocation='null', newLocation='null'}
   java.io.EOFException
           at 
org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
           at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
           at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
           at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
           at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
           at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
           at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:119)
           at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70)
           at 
org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.<init>(LazyInsertIterable.java:92)
           at 
org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170)
           at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   I debugged hudi code and found that RDD generic record's schema has 52 
fields only since producer is still producing using version 1. RDD' schema is 
used to convert avro bytest in HoodieAvroUtils.avroToBytes.
   
   But to convert them back from bytes to avro HoodieAvroUtils.bytesToAvro 
method is using schema provided in property file. Since we specified latest 
version in `hoodie.deltastreamer.schemaprovider.registry.url` and this schema 
is not same as one used to convert to bytes, we are getting above error.  
   
   I noticed there is `targetUrl` property and i specified the version 1 and 
job started running fine. But problem is here when producer start using latest 
version to publish events, then we have to manually change the version in 
targetURL. My suggestion is whether we can use rdd schema for both 
HoodieAvroUtils.avroToBytes and HoodieAvroUtils.bytesToAvro, so that we can be 
sure same schema is used for both write and read?
   
   
`hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/latest`
   
`hoodie.deltastreamer.schemaprovider.registry.targetUrl=https://schema-registry.url.net/subjects/topicname-value/versions/1`
   
   ### Problem 2
   After setting targetURL property as mentioned above, hudi delta streamer was 
working fine. We tried to add transformation by passing `--transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer`
   
   ```
   java.lang.ArrayIndexOutOfBoundsException: 52
           at 
org.apache.avro.generic.GenericData$Record.get(GenericData.java:212)
           at 
org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170)
           at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:69)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
           at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
           at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
           at scala.collection.Iterator.isEmpty(Iterator.scala:385)
           at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
           at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429)
           at 
org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:47)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   This is because we have 54 converters in 
[AvroConversionHelper](https://github.com/apache/hudi/blob/4226d7514400d86761e39639e9554809b72b627c/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala#L170)
 as per latest schema but only 52 fields in RDD record. 
   
   If we set 
`hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/1`,
 then it works fine.
   
   So, in general, is it possible to set latest schema in property file and 
make delta streamer without issue or even better if we don't need to specify 
schema registry url in property file, instead we can use schema from the rdd?
   
   **Environment Description**
   
   * Hudi version : 0.5.3 & 0.6.0 rc1
   
   * Spark version : 2.4.4
   
   * Running on Docker? (yes/no) : no
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to