Hi Pratyaksh,
Looks like you forgot to attach the target schema. Anyways, to debug these 
schema issues, can you try printing both schema and record during encoding and 
decoding(when writing records) to get some idea of what is happening. 
Balaji.V    On Thursday, September 19, 2019, 12:16:00 AM PDT, Pratyaksh Sharma 
<pratyaks...@gmail.com> wrote:  
 
 Hi Balaji, 

Yes target schema is different from source schema in my case. I am attaching 
sample schemas for your reference. 
I was able to solve this issue by using HoodieJsonPayload along with using 
AvroKafkaSource, where at the time of creating payload, I am calling 
HoodieJsonPayload(<GenericRecord>.toString()) in HoodieDeltaStreamer so as to 
use the constructor HoodieJsonPayload (String record), via ReflectionUtils. All 
the flattening is still getting done by Transformer class. 
However I am still trying to understand why there was an issue with 
OverwriteWithLatestAvroPayload. I tried googling around, the most common reason 
for mentioned exception seems to be the schemas are having some issue. But if 
schemas had any issue, then it should not even work with HoodieJsonPayload. 


On Wed, Sep 18, 2019 at 9:34 PM vbal...@apache.org <vbal...@apache.org> wrote:

 
Hi Pratyaksh,
Since you are using Transformer and altering the schema, you need to make sure 
targetSchema (flattened) is different from source schema (nested). 
https://github.com/apache/incubator-hudi/blob/227785c022939cd2ba153c2a4f7791ab3394c6c7/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java#L76


Can you check if you are doing that ?
Regarding storing payload in bytes instead of AvroRecords, this is for 
performance reasons. The specific avro Record object keeps reference to schema 
which results in bloating up the RDD size.
Balaji.V      On Wednesday, September 18, 2019, 02:33:06 AM PDT, Pratyaksh 
Sharma <pratyaks...@gmail.com> wrote:  

 Also I am trying to understand why are we storing the
OverwriteWithLatestAvroPayload in the form of bytes and not the actual
record. Apologies if it is a very basic question, I am working on Avro for
the first time.

On Wed, Sep 18, 2019 at 2:25 PM Pratyaksh Sharma <pratyaks...@gmail.com>
wrote:

> Hi,
>
> I am trying to use Hudi (hoodie-0.4.7) for building CDC pipeline. I am
> using AvroKafkaSource and FilebasedSchemaProvider. The source schema looks
> something like this where all the columns are nested in a field called
> 'columns' -
>
> {
>
>  "name": "rawdata",
>
>  "type": "record",
>
>  "fields": [
>
>    {
>
>      "name": "type",
>
>      "type": "string"
>
>    },
>
>    {
>
>      "name": "timestamp",
>
>      "type": "string"
>
>    },
>
>    {
>
>      "name": "database",
>
>      "type": "string"
>
>    },
>
>    {
>
>      "name": "table_name",
>
>      "type": "string"
>
>    },
>
>    {
>
>      "name": "binlog_filename",
>
>      "type": "string"
>
>    },
>
>    {
>
>      "name": "binlog_position",
>
>      "type": "string"
>
>    },
>
>    {
>
>      "name": "columns",
>
>      "type": {"type": "map", "values": ["null","string"]}
>
>    }
>
>  ]
>
> }
>
> The target schema has all the columns and I am using transformer class to
> extract the actual column fields from 'columns' field. Everything seems to
> be working fine, however at the time of actual writing, I am getting the
> below exception -
>
> ERROR com.uber.hoodie.io.HoodieIOHandle  - Error writing record
> HoodieRecord{key=HoodieKey { recordKey=123 partitionPath=2019/06/20},
> currentLocation='null', newLocation='null'}
> java.lang.ArrayIndexOutOfBoundsException: 123
> at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
> at
> com.uber.hoodie.common.util.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:86)
> at
> com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69)
> at
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:70)
> at
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:83)
> at
> com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175)
> at
> com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
> at
> com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> I have verified the schemas and the data types are fine and in sync. Has
> anyone else faced this issue? Any leads will be helpful.
>
  
  

Reply via email to