This PR also touches on the same issue. https://github.com/apache/incubator-hudi/pull/765/files
On Thu, Jun 27, 2019 at 3:15 AM Katie Frost <[email protected]> wrote: > Hi Balaji, > > Thanks! I'll test out the PR and let you know how it goes. > > On Thu, 27 Jun 2019 at 09:03, [email protected] <[email protected]> > wrote: > > > Hi Katie, > > Yes, You are correct with your observation. > > The problem is that KafkaAvroDecoder does not guarantee that it will > > return generic records deserialized with latest schema. This causes > > deserialization errors in deltastreamer during writing time when input > > batch has records conforming to old schema versions and Hudi tries to > > deserialize the bytes with latest schema. > > Another user had mentioned this before and they are in the process of > > submitting a PR. Till then, I have cooked up a temporary PR to unblock > you. > > You can look at the PR to get the general idea and fix any bugs that you > > see (don't have a test setup handy for this). > > https://github.com/apache/incubator-hudi/pull/763 > > Can you build Hudi with the above PR and see if you are able to ingest > > records with old schema. > > Thanks,Balaji.V On Wednesday, June 26, 2019, 10:52:16 AM PDT, Vinoth > > Chandar <[email protected]> wrote: > > > > are you using DeltaStreamer with the Confluent Schema Registry? I think > > you > > read the stack trace right.. Schema Registry may be using the latest > schema > > instead of reading it using the schema the record was written in. I > > remember Balaji alluded to a (related?) issue around this.. balaji? > > > > On Wed, Jun 26, 2019 at 3:08 AM Katie Frost <[email protected]> > > wrote: > > > > > Hey, > > > > > > 1. I'm using MERGE_ON_READ but I get the same error regardless of table > > > type. > > > > > > 2. The exception I'm seeing is: > > > > > > 2019-06-26 09:29:46 ERROR HoodieIOHandle:139 - Error writing record > > > HoodieRecord{key=HoodieKey { > > recordKey=f01ce1af-9566-44dd-babc-4147f72ad531 > > > partitionPath=default}, currentLocation='null', newLocation='null'} > > > java.lang.ArrayIndexOutOfBoundsException: 3 > > > at org.apache.avro.generic.GenericData$Record.get(GenericData.java:135) > > > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) > > > at org.apache.avro.generic.GenericData.validate(GenericData.java:373) > > > at org.apache.avro.generic.GenericData.validate(GenericData.java:382) > > > at org.apache.avro.generic.GenericData.validate(GenericData.java:395) > > > at org.apache.avro.generic.GenericData.validate(GenericData.java:373) > > > at > > > > > > > > > com.uber.hoodie.common.util.HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.java:192) > > > at > > > > > > > > > com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69) > > > at > > > > > > > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:72) > > > at > > > > > > > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:85) > > > at > > > > > > > > > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175) > > > at > > > > > > > > > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > > > at > > > > > > > > > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$startProducers$0(BoundedInMemoryExecutor.java:94) > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > at > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > > at java.lang.Thread.run(Thread.java:748) > > > > > > and I'm seeing that error for every record that doesn't conform to > > current > > > schema. > > > In the newer schema 2 extra fields have been added to an array, making > 5 > > > elements in the array. In the older schema there are 3 fields in the > > array, > > > so when it says 'index out of bounds: 3' I am assuming it is expecting > > the > > > older data to have the extra fields added in the later schema. > > > > > > 3. By full compatibility I mean the avro schema changes are both > forward > > > and backward compatible: so new data can be read with older schema and > > old > > > data can be read with newer schema (we're enforcing this using > confluent > > > schema registry). Docs about it found here > > > https://docs.confluent.io/current/schema-registry/avro.html > > > > > > Thanks, > > > Katie > > > > > > On Tue, 25 Jun 2019 at 19:11, nishith agarwal <[email protected]> > > wrote: > > > > > > > Hi Katie, > > > > > > > > Thanks for explaining the problem in detail. Could you give us some > > more > > > > information before I can help you with this ? > > > > > > > > 1. What table type are you using - COPY_ON_WRITE or MERGE_ON_READ ? > > > > 2. Could you paste the exception you see in Hudi ? > > > > 3. "Despite the schema having full compatibility" -> Can you explain > > what > > > > you mean by "full compatibility" ? > > > > > > > > Thanks, > > > > Nishith > > > > > > > > On Tue, Jun 25, 2019 at 10:32 AM Katie Frost < > [email protected]> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I've been using the hudi delta streamer to create datasets in S3 > and > > > i've > > > > > had issues with hudi acknowledging schema compatibility. > > > > > > > > > > I'm trying to run a spark job ingesting avro data to a hudi dataset > > in > > > > s3, > > > > > with the raw avro source data also stored in s3. The raw avro data > > has > > > > two > > > > > different schema versions, and I have supplied the job with the > > latest > > > > > schema. However the job fails to ingest any of the data that is not > > up > > > to > > > > > date with the latest schema and ingests only the data matching the > > > given > > > > > schema, despite the schema having full compatibility. Is this a > known > > > > > issue? or just a case of missing some configuration? > > > > > > > > > > The error I get when running the job to ingest the data not up to > > date > > > > with > > > > > latest avro schema is an array index out of bounds exception, and I > > > know > > > > it > > > > > is a schema issue as I have tested running the job with the older > > > schema > > > > > version, removing any data that matches the latest schema, and the > > job > > > > runs > > > > > successfully. > > > > > > > > > > > > > > >
