Ok, let me try and get back to you. On Fri, Sep 20, 2019 at 2:02 AM vbal...@apache.org <vbal...@apache.org> wrote:
> > Hi Pratyaksh, > Looks like you forgot to attach the target schema. Anyways, to debug these > schema issues, can you try printing both schema and record during encoding > and decoding(when writing records) to get some idea of what is happening. > Balaji.V On Thursday, September 19, 2019, 12:16:00 AM PDT, Pratyaksh > Sharma <pratyaks...@gmail.com> wrote: > > Hi Balaji, > > Yes target schema is different from source schema in my case. I am > attaching sample schemas for your reference. > I was able to solve this issue by using HoodieJsonPayload along with using > AvroKafkaSource, where at the time of creating payload, I am calling > HoodieJsonPayload(<GenericRecord>.toString()) in HoodieDeltaStreamer so as > to use the constructor HoodieJsonPayload (String record), via > ReflectionUtils. All the flattening is still getting done by Transformer > class. > However I am still trying to understand why there was an issue with > OverwriteWithLatestAvroPayload. I tried googling around, the most common > reason for mentioned exception seems to be the schemas are having some > issue. But if schemas had any issue, then it should not even work with > HoodieJsonPayload. > > > On Wed, Sep 18, 2019 at 9:34 PM vbal...@apache.org <vbal...@apache.org> > wrote: > > > Hi Pratyaksh, > Since you are using Transformer and altering the schema, you need to make > sure targetSchema (flattened) is different from source schema (nested). > > https://github.com/apache/incubator-hudi/blob/227785c022939cd2ba153c2a4f7791ab3394c6c7/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java#L76 > > > Can you check if you are doing that ? > Regarding storing payload in bytes instead of AvroRecords, this is for > performance reasons. The specific avro Record object keeps reference to > schema which results in bloating up the RDD size. > Balaji.V On Wednesday, September 18, 2019, 02:33:06 AM PDT, Pratyaksh > Sharma <pratyaks...@gmail.com> wrote: > > Also I am trying to understand why are we storing the > OverwriteWithLatestAvroPayload in the form of bytes and not the actual > record. Apologies if it is a very basic question, I am working on Avro for > the first time. > > On Wed, Sep 18, 2019 at 2:25 PM Pratyaksh Sharma <pratyaks...@gmail.com> > wrote: > > > Hi, > > > > I am trying to use Hudi (hoodie-0.4.7) for building CDC pipeline. I am > > using AvroKafkaSource and FilebasedSchemaProvider. The source schema > looks > > something like this where all the columns are nested in a field called > > 'columns' - > > > > { > > > > "name": "rawdata", > > > > "type": "record", > > > > "fields": [ > > > > { > > > > "name": "type", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "timestamp", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "database", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "table_name", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "binlog_filename", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "binlog_position", > > > > "type": "string" > > > > }, > > > > { > > > > "name": "columns", > > > > "type": {"type": "map", "values": ["null","string"]} > > > > } > > > > ] > > > > } > > > > The target schema has all the columns and I am using transformer class to > > extract the actual column fields from 'columns' field. Everything seems > to > > be working fine, however at the time of actual writing, I am getting the > > below exception - > > > > ERROR com.uber.hoodie.io.HoodieIOHandle - Error writing record > > HoodieRecord{key=HoodieKey { recordKey=123 partitionPath=2019/06/20}, > > currentLocation='null', newLocation='null'} > > java.lang.ArrayIndexOutOfBoundsException: 123 > > at > org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402) > > at org.apache.avro.io > .ResolvingDecoder.doAction(ResolvingDecoder.java:290) > > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) > > at org.apache.avro.io > .ResolvingDecoder.readIndex(ResolvingDecoder.java:267) > > at > > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155) > > at > > > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) > > at > > > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) > > at > > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) > > at > > > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) > > at > > > com.uber.hoodie.common.util.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:86) > > at > > > com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69) > > at > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:70) > > at > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:83) > > at > > > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175) > > at > > > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > > at > > > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > I have verified the schemas and the data types are fine and in sync. Has > > anyone else faced this issue? Any leads will be helpful. > > > >