Hi Balaji, Thanks! I'll test out the PR and let you know how it goes.
On Thu, 27 Jun 2019 at 09:03, [email protected] <[email protected]> wrote: > Hi Katie, > Yes, You are correct with your observation. > The problem is that KafkaAvroDecoder does not guarantee that it will > return generic records deserialized with latest schema. This causes > deserialization errors in deltastreamer during writing time when input > batch has records conforming to old schema versions and Hudi tries to > deserialize the bytes with latest schema. > Another user had mentioned this before and they are in the process of > submitting a PR. Till then, I have cooked up a temporary PR to unblock you. > You can look at the PR to get the general idea and fix any bugs that you > see (don't have a test setup handy for this). > https://github.com/apache/incubator-hudi/pull/763 > Can you build Hudi with the above PR and see if you are able to ingest > records with old schema. > Thanks,Balaji.V On Wednesday, June 26, 2019, 10:52:16 AM PDT, Vinoth > Chandar <[email protected]> wrote: > > are you using DeltaStreamer with the Confluent Schema Registry? I think > you > read the stack trace right.. Schema Registry may be using the latest schema > instead of reading it using the schema the record was written in. I > remember Balaji alluded to a (related?) issue around this.. balaji? > > On Wed, Jun 26, 2019 at 3:08 AM Katie Frost <[email protected]> > wrote: > > > Hey, > > > > 1. I'm using MERGE_ON_READ but I get the same error regardless of table > > type. > > > > 2. The exception I'm seeing is: > > > > 2019-06-26 09:29:46 ERROR HoodieIOHandle:139 - Error writing record > > HoodieRecord{key=HoodieKey { > recordKey=f01ce1af-9566-44dd-babc-4147f72ad531 > > partitionPath=default}, currentLocation='null', newLocation='null'} > > java.lang.ArrayIndexOutOfBoundsException: 3 > > at org.apache.avro.generic.GenericData$Record.get(GenericData.java:135) > > at org.apache.avro.generic.GenericData.getField(GenericData.java:580) > > at org.apache.avro.generic.GenericData.validate(GenericData.java:373) > > at org.apache.avro.generic.GenericData.validate(GenericData.java:382) > > at org.apache.avro.generic.GenericData.validate(GenericData.java:395) > > at org.apache.avro.generic.GenericData.validate(GenericData.java:373) > > at > > > > > com.uber.hoodie.common.util.HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.java:192) > > at > > > > > com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69) > > at > > > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:72) > > at > > > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:85) > > at > > > > > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175) > > at > > > > > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > > at > > > > > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$startProducers$0(BoundedInMemoryExecutor.java:94) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > > > and I'm seeing that error for every record that doesn't conform to > current > > schema. > > In the newer schema 2 extra fields have been added to an array, making 5 > > elements in the array. In the older schema there are 3 fields in the > array, > > so when it says 'index out of bounds: 3' I am assuming it is expecting > the > > older data to have the extra fields added in the later schema. > > > > 3. By full compatibility I mean the avro schema changes are both forward > > and backward compatible: so new data can be read with older schema and > old > > data can be read with newer schema (we're enforcing this using confluent > > schema registry). Docs about it found here > > https://docs.confluent.io/current/schema-registry/avro.html > > > > Thanks, > > Katie > > > > On Tue, 25 Jun 2019 at 19:11, nishith agarwal <[email protected]> > wrote: > > > > > Hi Katie, > > > > > > Thanks for explaining the problem in detail. Could you give us some > more > > > information before I can help you with this ? > > > > > > 1. What table type are you using - COPY_ON_WRITE or MERGE_ON_READ ? > > > 2. Could you paste the exception you see in Hudi ? > > > 3. "Despite the schema having full compatibility" -> Can you explain > what > > > you mean by "full compatibility" ? > > > > > > Thanks, > > > Nishith > > > > > > On Tue, Jun 25, 2019 at 10:32 AM Katie Frost <[email protected]> > > > wrote: > > > > > > > Hi, > > > > > > > > I've been using the hudi delta streamer to create datasets in S3 and > > i've > > > > had issues with hudi acknowledging schema compatibility. > > > > > > > > I'm trying to run a spark job ingesting avro data to a hudi dataset > in > > > s3, > > > > with the raw avro source data also stored in s3. The raw avro data > has > > > two > > > > different schema versions, and I have supplied the job with the > latest > > > > schema. However the job fails to ingest any of the data that is not > up > > to > > > > date with the latest schema and ingests only the data matching the > > given > > > > schema, despite the schema having full compatibility. Is this a known > > > > issue? or just a case of missing some configuration? > > > > > > > > The error I get when running the job to ingest the data not up to > date > > > with > > > > latest avro schema is an array index out of bounds exception, and I > > know > > > it > > > > is a schema issue as I have tested running the job with the older > > schema > > > > version, removing any data that matches the latest schema, and the > job > > > runs > > > > successfully. > > > > > > > > > >
