[ 
https://issues.apache.org/jira/browse/HUDI-1906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nishith Agarwal updated HUDI-1906:
----------------------------------
    Labels: schema-evolution sev:high  (was: sev:high)

> Deltastreamer/SparkDatasource ingestion breaks when changing target schema 
> provider options
> -------------------------------------------------------------------------------------------
>
>                 Key: HUDI-1906
>                 URL: https://issues.apache.org/jira/browse/HUDI-1906
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: DeltaStreamer
>            Reporter: Nishith Agarwal
>            Priority: Major
>              Labels: schema-evolution, sev:high
>
> Currently, there are few different options to the user to provide target 
> schemas such as file based, schema registry. At a high level, there are 2 
> main flows 
>  # Target Schema is provided by the user
>  # Target schema is not provided by the user (which is then inferred from the 
> incoming data)
>  
> ||Schema post processor enabled||Transformers||User provided target 
> schema||Cur behavior||
> |yes|No|Yes|table schema's has no namespace. matches user provided schema|
> |yes|yes|No|had to make minor fix in post processor for NPE. with the fix, 
> table schema has namespace in it.|
> |yes|yes|yes|table schema has namespace|
> |no|no|yes|table schema's has no namespace. matches user provided schema|
> |no|yes|yes|table schema's has no namespace. matches user provided schema|
> |no|yes|no|table's schema has namespace.|
>  
> Source -> [https://github.com/apache/hudi/pull/2937]
> As you can see above, if one switches from a non-user-provided schema flow to 
> a user-provided-schema flow, we switch from namespace in schema to no 
> namespace in schema. 
> Parquet does not store the namespace, so when moving across avro schemas with 
> and without namespace, the parquet-avro writer or reader does not complain 
> since parquet itself does not store namespace. 
> However, for MergeOnRead tables, we serialize data and schema in the log 
> blocks. The GenericDatumReader that takes a reader & writer schema to 
> translate breaks when one schema has namespace while the other doesn't. 
>  
> The following exception is thrown 
> {noformat}
> 51511 [Executor task launch worker for task 502] ERROR 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got 
> exception when reading log file
> org.apache.avro.AvroTypeException: Found 
> hoodie.source.hoodie_source.height.fixed, expecting fixed
>       at 
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
>       at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>       at 
> org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
>       at 
> org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
>       at 
> org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
>       at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>       at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>       at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>       at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>       at 
> org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
>       at 
> org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
>       at 
> org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
>       at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748{noformat}
>  This seems like an AVRO shortcoming. We need a way to avoid breaking the 
> decoding of avro data in log files if the user moved around provider options. 
> One way is to implement a custom GenericDatumReader. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to