phantomcoder62 opened a new issue, #8040:
URL: https://github.com/apache/hudi/issues/8040

   **Problem**
   
   We were trying schema evolution in our MOR Hudi table and we are able to add 
new column but when we delete a column/change its type, it gives this error -
   "py4j.protocol.Py4JJavaError: An error occurred while calling o130.save.
   org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: 
Avro field 'deleted_column_name' not found
           at 
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)"
   
   We did more research on this issue as it was coming occasionally. But once 
it comes, the error continues for all the subsequent write operation on the 
hudi MOR table and we can't write anything on that hudi table.
   
   So our finding is that schema evolution may work fine until compaction takes 
place. When compaction happens in MOR, it get some file with the deleted column 
and some without it (or different datatypes) and that is causing this issue. 
You can test this by keeping the compaction, min/max commit, cleaner commit 
retained configurations to low values in MOR.
   
   **To Reproduce**
   
   This is the code snippet. We are using Glue 4.0 and Hudi version 0.12.1. 
(Tried using EC2 as well)
   
   import sys
   from awsglue.transforms import *
   from awsglue.utils import getResolvedOptions
   from awsglue.context import GlueContext
   from awsglue.job import Job
   from pyspark.sql.session import SparkSession
   
   args = getResolvedOptions(sys.argv, ['JOB_NAME','schema_evolution_testing'])
   
   spark = 
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   job.init(args['JOB_NAME'], args)
   
   def main():    
       # HUDI configuration
       commonOptions = {                                               
           'hoodie.upsert.shuffle.parallelism': 200,
           'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
           'hoodie.datasource.write.operation': 'upsert',
           'hoodie.compact.inline': 'true', 
           'hoodie.compact.inline.max.delta.commits': 5,
           'hoodie.keep.max.commits': 5,
           'hoodie.keep.min.commits': 4,
           'hoodie.datasource.write.precombine.field': 'cdc_timestamp',
           'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
           'hoodie.cleaner.commits.retained': 3,
           'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
       }
       tableConfig = {                                             
               "hoodie.table.name": 'emp',
               "hoodie.datasource.write.recordkey.field": 'id',
               'hoodie.datasource.write.table.name': 'emp'
       }
       CombinedConfig = {**tableConfig, **commonOptions}
       
       # Reading Parquet files from raw S3 DMS output
       df = spark.read.parquet('s3://bucket/folder/emp/')
       print("df schema is:",df.schema.simpleString())
       df.show()
       
       # Writing into hudi
       (
           df.write.format("org.apache.hudi")
           .options(**CombinedConfig)
           .mode("append")
           .save(f"s3://hudi_bucket/hudi_folder/emp")
       )
       print("Upsertion into HUDI completed")
   
   main()
   job.commit()
   
   These are the steps we followed for testing this issue -
   
   **Column Type Changed :**
   run 1 -
   creation and insertion of 4000 records in hudi table emp with schema - 
struct<Op:string,cdc_timestamp:string,id:int,name:string>. This job was 
successful and it created a parquet file in hudi table
   
   run 2 -
   Altered emp table and added column mgr_id integer. Updated 4000 records. New 
schema is - 
struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:int>. This job 
was successful and it created a .log file in Hudi table.
   
   run 3 -
   Altered emp table changed type of mgr_id column from integer to character 
varying and updated 3000 records & inserted 5 records of mgr_id. New schema is 
- struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:string>. This 
job was successful and it created another .log file and another parquet file in 
Hudi table.
   
   run 4 -
   Updated one record in DB. This job was successful and it created another 
.log file.
   
   run 5 -
   Updated one record in DB. This job failed with this ERROR- An error occurred 
while calling o120.save. Found int, expecting union
   
   Error log -
   File "/tmp/schema_evolution_testing.py", line 51, in main
       .save(hudi_table_path)
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
line 968, in save
       self._jwrite.save(path)
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 
1321, in __call__
       return_value = get_return_value(
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
190, in deco
       return f(*a, **kw)
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value
       raise Py4JJavaError(
   py4j.protocol.Py4JJavaError: An error occurred while calling o120.save.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 41.0 
(TID 185) (172.34.225.30 executor 4): 
org.apache.hudi.exception.HoodieException: Exception when reading log file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
        at 
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:198)
        at 
org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$57154431$1(HoodieCompactor.java:138)
        at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
        at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
        at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.avro.AvroTypeException: Found int, expecting union
        at 
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
        at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:207)
        at 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:144)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:382)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:464)
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
   
   
   **Column dropped :**
   run 1 -
   creation and insertion of 35 records in hudi table emp with schema - 
struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:string>. This 
job was successful and it created a parquet file in hudi table bucket.
   
   run 2 -
   Altered emp table and dropped column mgr_id. Updated 35 records. New schema 
is - struct<Op:string,cdc_timestamp:string,id:int,name:string>. This job 
created 4 new parquet files in hudi table bucket and then failed with this 
error - An error occurred while calling o120.save. Parquet/Avro schema 
mismatch: Avro field 'mgr_id' not found.
   Error log -
    File "/tmp/schema_evolution_testing.py", line 51, in main
       .save(hudi_table_path)
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", 
line 968, in save
       self._jwrite.save(path)
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 
1321, in __call__
       return_value = get_return_value(
     File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 
190, in deco
       return f(*a, **kw)
     File 
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value
       raise Py4JJavaError(
   py4j.protocol.Py4JJavaError: An error occurred while calling o120.save.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 
(TID 120) (172.36.30.113 executor 5): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType 
UPDATE for partition :0
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
        at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
        at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
        at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
        at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
        at 
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:80)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
        at 
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: operation has failed
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
   Caused by: org.apache.hudi.exception.HoodieException: operation has failed
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   Caused by: org.apache.hudi.exception.HoodieException: unable to read next 
record from parquet file 
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
        at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema 
mismatch: Avro field 'mgr_id' not found
        at 
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
        at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
        at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
        at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
        at 
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at 
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
   
   This script has same behavior on AWS EC2 as well where our production code 
is running for same HUDI Version 0.12.1.
   
   
   **Environment Description**
   
   * Hudi version : 0.12.1
   
   * Spark version : 3.3
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Platform : Glue 4.0 and EC2
   
   * Running on Docker? (yes/no) : No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to