sathyaprakashg opened a new issue #1972:
URL: https://github.com/apache/hudi/issues/1972


   I am getting below error when using Deltasteamer with Transformation 
(`--transformer-class 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer`) against kafka 
source. 
   
   ```
    ERROR io.HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey 
{ recordKey=123 partitionPath=CA}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException: 25
           at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
           at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
           at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
           at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
           at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:119)
           at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70)
           at 
org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.<init>(LazyInsertIterable.java:92)
           at 
org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170)
           at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   I see this is caused by incompatiblity between schema that is used to 
convert avro to bytes `HoodieAvroUtils.avroToBytes` and bytes to avro 
`HoodieAvroUtils.bytesToAvro`. 
   
   For eg: below is schema used to convert avro to bytes. This schema is 
created in this 
[method](https://github.com/apache/hudi/blob/master/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala#L342)
 from spark data type.  This method uses 
[SchemaConverters](https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L188),
 which always seems to be adding null as second data type in union 
`Schema.createUnion(schema, nullSchema)`
   
   ```
   {
     "type": "record",
     "name": "hoodie_source",
     "namespace": "hoodie.source",
     "fields": [
       {
         "name": "field1",
         "type": [
           "double",
           "null"
         ]
       },
       {
         "name": "field2",
         "type": [
           "double",
           "null"
         ]
       }
     ]
   }
   ```
   
   Below is the schema used to convert bytes to avro. This schema is retrieved 
from `hoodie.deltastreamer.schemaprovider.registry.url`
   ```
   {
     "type": "record",
     "name": "MyRecord",
     "namespace": "com.xyz.abc",
     "fields": [
       {
         "name": "field1",
         "type": [
           "null",
           "double"
         ],
         "default": null
       },
       {
         "name": "field2",
         "type": [
           "null",
           "double"
         ],
         "default": null
       }
     ]
   }
   ```
   As we can see in the first schema, it is union of double and null but in the 
second one it is null and double. In the avro schema, union types must be in 
same order. Since it is not, hudi is not able to convert the bytes back to avro
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to