Hi Katie,
Yes, You are correct with your observation. 
The problem is that KafkaAvroDecoder does not guarantee that it will return 
generic records deserialized with latest schema. This causes deserialization 
errors in deltastreamer during writing time when input batch has records 
conforming to old schema versions and Hudi tries to deserialize the bytes with 
latest schema. 
Another user had mentioned this before and they are in the process of 
submitting a PR. Till then, I have cooked up a temporary PR to unblock you. You 
can look at the PR to get the general idea and fix any bugs that you see (don't 
have a test setup handy for this).
https://github.com/apache/incubator-hudi/pull/763
Can you build Hudi with the above PR and see if you are able to ingest records 
with old schema.
Thanks,Balaji.V    On Wednesday, June 26, 2019, 10:52:16 AM PDT, Vinoth Chandar 
<[email protected]> wrote:  
 
 are you using DeltaStreamer with the Confluent Schema Registry? I think you
read the stack trace right.. Schema Registry may be using the latest schema
instead of reading it using the schema the record was written in.  I
remember Balaji alluded to a (related?) issue around this.. balaji?

On Wed, Jun 26, 2019 at 3:08 AM Katie Frost <[email protected]> wrote:

> Hey,
>
> 1. I'm using MERGE_ON_READ but I get the same error regardless of table
> type.
>
> 2. The exception I'm seeing is:
>
> 2019-06-26 09:29:46 ERROR HoodieIOHandle:139 - Error writing record
> HoodieRecord{key=HoodieKey { recordKey=f01ce1af-9566-44dd-babc-4147f72ad531
> partitionPath=default}, currentLocation='null', newLocation='null'}
> java.lang.ArrayIndexOutOfBoundsException: 3
> at org.apache.avro.generic.GenericData$Record.get(GenericData.java:135)
> at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
> at org.apache.avro.generic.GenericData.validate(GenericData.java:373)
> at org.apache.avro.generic.GenericData.validate(GenericData.java:382)
> at org.apache.avro.generic.GenericData.validate(GenericData.java:395)
> at org.apache.avro.generic.GenericData.validate(GenericData.java:373)
> at
>
> com.uber.hoodie.common.util.HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.java:192)
> at
>
> com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69)
> at
>
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:72)
> at
>
> com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:85)
> at
>
> com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175)
> at
>
> com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
> at
>
> com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$startProducers$0(BoundedInMemoryExecutor.java:94)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> and I'm seeing that error for every record that doesn't conform to current
> schema.
> In the newer schema 2 extra fields have been added to an array, making 5
> elements in the array. In the older schema there are 3 fields in the array,
> so when it says 'index out of bounds: 3' I am assuming it is expecting the
> older data to have the extra fields added in the later schema.
>
> 3. By full compatibility I mean the avro schema changes are both forward
> and backward compatible: so new data can be read with older schema and old
> data can be read with newer schema (we're enforcing this using confluent
> schema registry). Docs about it found here
> https://docs.confluent.io/current/schema-registry/avro.html
>
> Thanks,
> Katie
>
> On Tue, 25 Jun 2019 at 19:11, nishith agarwal <[email protected]> wrote:
>
> > Hi Katie,
> >
> > Thanks for explaining the problem in detail. Could you give us some more
> > information before I can help you with this ?
> >
> > 1. What table type are you using - COPY_ON_WRITE or MERGE_ON_READ ?
> > 2. Could you paste the exception you see in Hudi ?
> > 3. "Despite the schema having full compatibility" -> Can you explain what
> > you mean by "full compatibility" ?
> >
> > Thanks,
> > Nishith
> >
> > On Tue, Jun 25, 2019 at 10:32 AM Katie Frost <[email protected]>
> > wrote:
> >
> > > Hi,
> > >
> > > I've been using the hudi delta streamer to create datasets in S3 and
> i've
> > > had issues with hudi acknowledging schema compatibility.
> > >
> > > I'm trying to run a spark job ingesting avro data to a hudi dataset in
> > s3,
> > > with the raw avro source data also stored in s3. The raw avro data has
> > two
> > > different schema versions, and I have supplied the job with the latest
> > > schema. However the job fails to ingest any of the data that is not up
> to
> > > date with the latest schema and ingests only the data matching the
> given
> > > schema, despite the schema having full compatibility. Is this a known
> > > issue? or just a case of missing some configuration?
> > >
> > > The error I get when running the job to ingest the data not up to date
> > with
> > > latest avro schema is an array index out of bounds exception, and I
> know
> > it
> > > is a schema issue as I have tested running the job with the older
> schema
> > > version, removing any data that matches the latest schema, and the job
> > runs
> > > successfully.
> > >
> >
>
  

Reply via email to