ah. This fix is pushed into the KafkaUtils class itself. So, we need to rework this more generically. Essentially, we need to know what schema each record was written with in the Avro DFS data. How do you get the Avro data onto DFS?
On Wed, Jul 3, 2019 at 2:37 AM Katie Frost <[email protected]> wrote: > Hey, > Sorry about the delay! I've tried it out and it doesn't work for us, but > we're using the AvroDFSSource not the AvroKafkaSource so maybe a similar > thing needs to be done to that class? > > Katie > > On Wed, 3 Jul 2019 at 01:04, Balaji Varadarajan <[email protected] > > > wrote: > > > Hi Katie, > > Can you let us know if you are able to get things working with the PRs. > > Balaji.V > > On Friday, June 28, 2019, 11:18:28 AM PDT, Vinoth Chandar < > > [email protected]> wrote: > > > > This PR also touches on the same issue. > > https://github.com/apache/incubator-hudi/pull/765/files > > > > On Thu, Jun 27, 2019 at 3:15 AM Katie Frost <[email protected]> > > wrote: > > > > > Hi Balaji, > > > > > > Thanks! I'll test out the PR and let you know how it goes. > > > > > > On Thu, 27 Jun 2019 at 09:03, [email protected] <[email protected]> > > > wrote: > > > > > > > Hi Katie, > > > > Yes, You are correct with your observation. > > > > The problem is that KafkaAvroDecoder does not guarantee that it will > > > > return generic records deserialized with latest schema. This causes > > > > deserialization errors in deltastreamer during writing time when > input > > > > batch has records conforming to old schema versions and Hudi tries to > > > > deserialize the bytes with latest schema. > > > > Another user had mentioned this before and they are in the process of > > > > submitting a PR. Till then, I have cooked up a temporary PR to > unblock > > > you. > > > > You can look at the PR to get the general idea and fix any bugs that > > you > > > > see (don't have a test setup handy for this). > > > > https://github.com/apache/incubator-hudi/pull/763 > > > > Can you build Hudi with the above PR and see if you are able to > ingest > > > > records with old schema. > > > > Thanks,Balaji.V On Wednesday, June 26, 2019, 10:52:16 AM PDT, > Vinoth > > > > Chandar <[email protected]> wrote: > > > > > > > > are you using DeltaStreamer with the Confluent Schema Registry? I > > think > > > > you > > > > read the stack trace right.. Schema Registry may be using the latest > > > schema > > > > instead of reading it using the schema the record was written in. I > > > > remember Balaji alluded to a (related?) issue around this.. balaji? > > > > > > > > On Wed, Jun 26, 2019 at 3:08 AM Katie Frost <[email protected] > > > > > > wrote: > > > > > > > > > Hey, > > > > > > > > > > 1. I'm using MERGE_ON_READ but I get the same error regardless of > > table > > > > > type. > > > > > > > > > > 2. The exception I'm seeing is: > > > > > > > > > > 2019-06-26 09:29:46 ERROR HoodieIOHandle:139 - Error writing record > > > > > HoodieRecord{key=HoodieKey { > > > > recordKey=f01ce1af-9566-44dd-babc-4147f72ad531 > > > > > partitionPath=default}, currentLocation='null', newLocation='null'} > > > > > java.lang.ArrayIndexOutOfBoundsException: 3 > > > > > at > > org.apache.avro.generic.GenericData$Record.get(GenericData.java:135) > > > > > at > org.apache.avro.generic.GenericData.getField(GenericData.java:580) > > > > > at > org.apache.avro.generic.GenericData.validate(GenericData.java:373) > > > > > at > org.apache.avro.generic.GenericData.validate(GenericData.java:382) > > > > > at > org.apache.avro.generic.GenericData.validate(GenericData.java:395) > > > > > at > org.apache.avro.generic.GenericData.validate(GenericData.java:373) > > > > > at > > > > > > > > > > > > > > > > > > > > com.uber.hoodie.common.util.HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.java:192) > > > > > at > > > > > > > > > > > > > > > > > > > > com.uber.hoodie.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69) > > > > > at > > > > > > > > > > > > > > > > > > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:72) > > > > > at > > > > > > > > > > > > > > > > > > > > com.uber.hoodie.func.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:85) > > > > > at > > > > > > > > > > > > > > > > > > > > com.uber.hoodie.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:175) > > > > > at > > > > > > > > > > > > > > > > > > > > com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45) > > > > > at > > > > > > > > > > > > > > > > > > > > com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$startProducers$0(BoundedInMemoryExecutor.java:94) > > > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > > > at > > > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > > > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > > > > at > > > > > > > > > > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > > > > at > > > > > > > > > > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > > > and I'm seeing that error for every record that doesn't conform to > > > > current > > > > > schema. > > > > > In the newer schema 2 extra fields have been added to an array, > > making > > > 5 > > > > > elements in the array. In the older schema there are 3 fields in > the > > > > array, > > > > > so when it says 'index out of bounds: 3' I am assuming it is > > expecting > > > > the > > > > > older data to have the extra fields added in the later schema. > > > > > > > > > > 3. By full compatibility I mean the avro schema changes are both > > > forward > > > > > and backward compatible: so new data can be read with older schema > > and > > > > old > > > > > data can be read with newer schema (we're enforcing this using > > > confluent > > > > > schema registry). Docs about it found here > > > > > https://docs.confluent.io/current/schema-registry/avro.html > > > > > > > > > > Thanks, > > > > > Katie > > > > > > > > > > On Tue, 25 Jun 2019 at 19:11, nishith agarwal <[email protected] > > > > > > wrote: > > > > > > > > > > > Hi Katie, > > > > > > > > > > > > Thanks for explaining the problem in detail. Could you give us > some > > > > more > > > > > > information before I can help you with this ? > > > > > > > > > > > > 1. What table type are you using - COPY_ON_WRITE or > MERGE_ON_READ ? > > > > > > 2. Could you paste the exception you see in Hudi ? > > > > > > 3. "Despite the schema having full compatibility" -> Can you > > explain > > > > what > > > > > > you mean by "full compatibility" ? > > > > > > > > > > > > Thanks, > > > > > > Nishith > > > > > > > > > > > > On Tue, Jun 25, 2019 at 10:32 AM Katie Frost < > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I've been using the hudi delta streamer to create datasets in > S3 > > > and > > > > > i've > > > > > > > had issues with hudi acknowledging schema compatibility. > > > > > > > > > > > > > > I'm trying to run a spark job ingesting avro data to a hudi > > dataset > > > > in > > > > > > s3, > > > > > > > with the raw avro source data also stored in s3. The raw avro > > data > > > > has > > > > > > two > > > > > > > different schema versions, and I have supplied the job with the > > > > latest > > > > > > > schema. However the job fails to ingest any of the data that is > > not > > > > up > > > > > to > > > > > > > date with the latest schema and ingests only the data matching > > the > > > > > given > > > > > > > schema, despite the schema having full compatibility. Is this a > > > known > > > > > > > issue? or just a case of missing some configuration? > > > > > > > > > > > > > > The error I get when running the job to ingest the data not up > to > > > > date > > > > > > with > > > > > > > latest avro schema is an array index out of bounds exception, > > and I > > > > > know > > > > > > it > > > > > > > is a schema issue as I have tested running the job with the > older > > > > > schema > > > > > > > version, removing any data that matches the latest schema, and > > the > > > > job > > > > > > runs > > > > > > > successfully. > > > > > > > > > > > > > > > > > > > > > > > > > > > >
