Hi Balaji, Yes target schema is different from source schema in my case. I am attaching sample schemas for your reference.
I was able to solve this issue by using HoodieJsonPayload along with using AvroKafkaSource, where at the time of creating payload, I am calling HoodieJsonPayload(<GenericRecord>.toString()) in HoodieDeltaStreamer so as to use the constructor HoodieJsonPayload (String record), via ReflectionUtils. All the flattening is still getting done by Transformer class. However I am still trying to understand why there was an issue with OverwriteWithLatestAvroPayload. I tried googling around, the most common reason for mentioned exception seems to be the schemas are having some issue. But if schemas had any issue, then it should not even work with HoodieJsonPayload. On Wed, Sep 18, 2019 at 9:34 PM vbal...@apache.org <vbal...@apache.org> wrote: > > Hi Pratyaksh, > Since you are using Transformer and altering the schema, you need to make > sure targetSchema (flattened) is different from source schema (nested). > > https://github.com/apache/incubator-hudi/blob/227785c022939cd2ba153c2a4f7791ab3394c6c7/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java#L76 > > > Can you check if you are doing that ? > Regarding storing payload in bytes instead of AvroRecords, this is for > performance reasons. The specific avro Record object keeps reference to > schema which results in bloating up the RDD size. > Balaji.V On Wednesday, September 18, 2019, 02:33:06 AM PDT, Pratyaksh > Sharma <pratyaks...@gmail.com> wrote: > > Also I am trying to understand why are we storing the > OverwriteWithLatestAvroPayload in the form of bytes and not the actual > record. Apologies if it is a very basic question, I am working on Avro for > the first time. > > On Wed, Sep 18, 2019 at 2:25 PM Pratyaksh Sharma <pratyaks...@gmail.com> > wrote: > > > Hi, > > > > I am trying to use Hudi (hoodie-0.4.7) for building CDC pipeline. I am > > using AvroKafkaSource and FilebasedSchemaProvider. The source schema > looks > > something like this where all the columns are nested in a field called > > 'columns' - > > > > { > > > > "name": "rawdata", > > > > "type": "record", > > > > "fields": [ > > > > { > > > > "name": "type", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "timestamp", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "database", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "table_name", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "binlog_filename", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "binlog_position", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "columns", > > > > "type": {"type": "map", "values": ["null","string"]} > > > > } > > > > ] > > > > } > > > > The target schema has all the columns and I am using transformer class to > > extract the actual column fields from 'columns' field. Everything seems > to > > be working fine, however at the time of actual writing, I am getting the > > below exception - > > > > ERROR com.uber.hoodie.io.HoodieIOHandle - Error writing record > > HoodieRecord{key=HoodieKey { recordKey=123 partitionPath=2019/06/20}, > > currentLocation='null', newLocation='null'} > > java.lang.ArrayIndexOutOfBoundsException: 123 > > at > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) > > at org.apache.avro.io > .ResolvingDecoder.doAction(ResolvingDecoder.java:290) > > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > > at org.apache.avro.io > .ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > > at > > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > > at > > > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > > at > > > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > > at > > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > > at > > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > > at > > > com.uber.hoodie.common.util.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:86) > > at > > > com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69) > > at > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:70) > > at > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:83) > > at > > > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175) > > at > > > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > > at > > > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > I have verified the schemas and the data types are fine and in sync. Has > > anyone else faced this issue? Any leads will be helpful. > > >