Also I am trying to understand why are we storing the OverwriteWithLatestAvroPayload in the form of bytes and not the actual record. Apologies if it is a very basic question, I am working on Avro for the first time.
On Wed, Sep 18, 2019 at 2:25 PM Pratyaksh Sharma <pratyaks...@gmail.com> wrote: > Hi, > > I am trying to use Hudi (hoodie-0.4.7) for building CDC pipeline. I am > using AvroKafkaSource and FilebasedSchemaProvider. The source schema looks > something like this where all the columns are nested in a field called > 'columns' - > > { > > "name": "rawdata", > > "type": "record", > > "fields": [ > > { > > "name": "type", > > "type": "string" > > }, > > { > > "name": "timestamp", > > "type": "string" > > }, > > { > > "name": "database", > > "type": "string" > > }, > > { > > "name": "table_name", > > "type": "string" > > }, > > { > > "name": "binlog_filename", > > "type": "string" > > }, > > { > > "name": "binlog_position", > > "type": "string" > > }, > > { > > "name": "columns", > > "type": {"type": "map", "values": ["null","string"]} > > } > > ] > > } > > The target schema has all the columns and I am using transformer class to > extract the actual column fields from 'columns' field. Everything seems to > be working fine, however at the time of actual writing, I am getting the > below exception - > > ERROR com.uber.hoodie.io.HoodieIOHandle - Error writing record > HoodieRecord{key=HoodieKey { recordKey=123 partitionPath=2019/06/20}, > currentLocation='null', newLocation='null'} > java.lang.ArrayIndexOutOfBoundsException: 123 > at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) > at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > at > com.uber.hoodie.common.util.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:86) > at > com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69) > at > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:70) > at > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:83) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175) > at > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > at > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > I have verified the schemas and the data types are fine and in sync. Has > anyone else faced this issue? Any leads will be helpful. >