Hi Pratyaksh, Since you are using Transformer and altering the schema, you need to make sure targetSchema (flattened) is different from source schema (nested). https://github.com/apache/incubator-hudi/blob/227785c022939cd2ba153c2a4f7791ab3394c6c7/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java#L76
Can you check if you are doing that ? Regarding storing payload in bytes instead of AvroRecords, this is for performance reasons. The specific avro Record object keeps reference to schema which results in bloating up the RDD size. Balaji.V On Wednesday, September 18, 2019, 02:33:06 AM PDT, Pratyaksh Sharma <pratyaks...@gmail.com> wrote: Also I am trying to understand why are we storing the OverwriteWithLatestAvroPayload in the form of bytes and not the actual record. Apologies if it is a very basic question, I am working on Avro for the first time. On Wed, Sep 18, 2019 at 2:25 PM Pratyaksh Sharma <pratyaks...@gmail.com> wrote: > Hi, > > I am trying to use Hudi (hoodie-0.4.7) for building CDC pipeline. I am > using AvroKafkaSource and FilebasedSchemaProvider. The source schema looks > something like this where all the columns are nested in a field called > 'columns' - > > { > > "name": "rawdata", > > "type": "record", > > "fields": [ > > { > > "name": "type", > > "type": "string" > > }, > > { > > "name": "timestamp", > > "type": "string" > > }, > > { > > "name": "database", > > "type": "string" > > }, > > { > > "name": "table_name", > > "type": "string" > > }, > > { > > "name": "binlog_filename", > > "type": "string" > > }, > > { > > "name": "binlog_position", > > "type": "string" > > }, > > { > > "name": "columns", > > "type": {"type": "map", "values": ["null","string"]} > > } > > ] > > } > > The target schema has all the columns and I am using transformer class to > extract the actual column fields from 'columns' field. Everything seems to > be working fine, however at the time of actual writing, I am getting the > below exception - > > ERROR com.uber.hoodie.io.HoodieIOHandle - Error writing record > HoodieRecord{key=HoodieKey { recordKey=123 partitionPath=2019/06/20}, > currentLocation='null', newLocation='null'} > java.lang.ArrayIndexOutOfBoundsException: 123 > at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) > at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > at > com.uber.hoodie.common.util.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:86) > at > com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69) > at > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:70) > at > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:83) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175) > at > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > I have verified the schemas and the data types are fine and in sync. Has > anyone else faced this issue? Any leads will be helpful. >