[ 
https://issues.apache.org/jira/browse/HUDI-1906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nishith Agarwal updated HUDI-1906:
----------------------------------
    Description: 
Currently, there are few different options to the user to provide target 
schemas such as file based, schema registry. At a high level, there are 2 main 
flows 
 # Target Schema is provided by the user
 # Target schema is not provided by the user (which is then inferred from the 
incoming data)

 
||Schema post processor enabled||Transformers||User provided target schema||Cur 
behavior||
|yes|No|Yes|table schema's has no namespace. matches user provided schema|
|yes|yes|No|had to make minor fix in post processor for NPE. with the fix, 
table schema has namespace in it.|
|yes|yes|yes|table schema has namespace|
|no|no|yes|table schema's has no namespace. matches user provided schema|
|no|yes|yes|table schema's has no namespace. matches user provided schema|
|no|yes|no|table's schema has namespace.|

 

Source -> [https://github.com/apache/hudi/pull/2937]

As you can see above, if one switches from a non-user-provided schema flow to a 
user-provided-schema flow, we switch from namespace in schema to no namespace 
in schema. 

Parquet does not store the namespace, so when moving across avro schemas with 
and without namespace, the parquet-avro writer or reader does not complain 
since parquet itself does not store namespace. 

However, for MergeOnRead tables, we serialize data and schema in the log 
blocks. The GenericDatumReader that takes a reader & writer schema to translate 
breaks when one schema has namespace while the other doesn't. 

 

The following exception is thrown 
{noformat}
51511 [Executor task launch worker for task 502] ERROR 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got 
exception when reading log file
org.apache.avro.AvroTypeException: Found 
hoodie.source.hoodie_source.height.fixed, expecting fixed
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at 
org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
        at 
org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
        at 
org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
        at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
        at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748{noformat}
 

  was:
Currently, there are few different options to the user to provide target 
schemas such as file based, schema registry. At a high level, there are 2 main 
flows 


 # Target Schema is provided by the user
 # Target schema is not provided by the user (which is then inferred from the 
incoming data)

 
||Schema post processor enabled||Transformers||User provided target schema||Cur 
behavior||
|yes|No|Yes|table schema's has no namespace. matches user provided schema|
|yes|yes|No|had to make minor fix in post processor for NPE. with the fix, 
table schema has namespace in it.|
|yes|yes|yes|table schema has namespace|
|no|no|yes|table schema's has no namespace. matches user provided schema|
|no|yes|yes|table schema's has no namespace. matches user provided schema|
|no|yes|no|table's schema has namespace.|

 

Source -> [https://github.com/apache/hudi/pull/2937]

As you can see above, if one switches from a non-user-provided schema flow to a 
user-provided-schema flow, we switch from namespace in schema to no namespace 
in schema. 

Parquet does not store the namespace, so when moving across avro schemas with 
and without namespace, the parquet-avro writer or reader does not complain 
since parquet itself does not store namespace. 

However, for MergeOnRead tables, we serialize data and schema in the log 
blocks. The GenericDatumReader that takes a reader & writer schema to translate 
breaks when one schema has namespace while the other doesn't. 

 

The following exception is thrown 
{noformat}
51511 [Executor task launch worker for task 502] ERROR 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got 
exception when reading log file
org.apache.avro.AvroTypeException: Found 
hoodie.source.hoodie_source.height.fixed, expecting fixed
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at 
org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
        at 
org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
        at 
org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
        at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
        at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748{noformat}


> Deltastreamer/SparkDatasource ingestion breaks when changing target schema 
> provider options
> -------------------------------------------------------------------------------------------
>
>                 Key: HUDI-1906
>                 URL: https://issues.apache.org/jira/browse/HUDI-1906
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: DeltaStreamer
>            Reporter: Nishith Agarwal
>            Priority: Major
>
> Currently, there are few different options to the user to provide target 
> schemas such as file based, schema registry. At a high level, there are 2 
> main flows 
>  # Target Schema is provided by the user
>  # Target schema is not provided by the user (which is then inferred from the 
> incoming data)
>  
> ||Schema post processor enabled||Transformers||User provided target 
> schema||Cur behavior||
> |yes|No|Yes|table schema's has no namespace. matches user provided schema|
> |yes|yes|No|had to make minor fix in post processor for NPE. with the fix, 
> table schema has namespace in it.|
> |yes|yes|yes|table schema has namespace|
> |no|no|yes|table schema's has no namespace. matches user provided schema|
> |no|yes|yes|table schema's has no namespace. matches user provided schema|
> |no|yes|no|table's schema has namespace.|
>  
> Source -> [https://github.com/apache/hudi/pull/2937]
> As you can see above, if one switches from a non-user-provided schema flow to 
> a user-provided-schema flow, we switch from namespace in schema to no 
> namespace in schema. 
> Parquet does not store the namespace, so when moving across avro schemas with 
> and without namespace, the parquet-avro writer or reader does not complain 
> since parquet itself does not store namespace. 
> However, for MergeOnRead tables, we serialize data and schema in the log 
> blocks. The GenericDatumReader that takes a reader & writer schema to 
> translate breaks when one schema has namespace while the other doesn't. 
>  
> The following exception is thrown 
> {noformat}
> 51511 [Executor task launch worker for task 502] ERROR 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got 
> exception when reading log file
> org.apache.avro.AvroTypeException: Found 
> hoodie.source.hoodie_source.height.fixed, expecting fixed
>       at 
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
>       at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>       at 
> org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
>       at 
> org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
>       at 
> org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
>       at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>       at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>       at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>       at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>       at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>       at 
> org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
>       at 
> org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
>       at 
> org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
>       at 
> org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
>       at 
> org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
>       at 
> org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to