phantomcoder62 opened a new issue, #8040:
URL: https://github.com/apache/hudi/issues/8040
**Problem**
We were trying schema evolution in our MOR Hudi table and we are able to add
new column but when we delete a column/change its type, it gives this error -
"py4j.protocol.Py4JJavaError: An error occurred while calling o130.save.
org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch:
Avro field 'deleted_column_name' not found
at
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)"
We did more research on this issue as it was coming occasionally. But once
it comes, the error continues for all the subsequent write operation on the
hudi MOR table and we can't write anything on that hudi table.
So our finding is that schema evolution may work fine until compaction takes
place. When compaction happens in MOR, it get some file with the deleted column
and some without it (or different datatypes) and that is causing this issue.
You can test this by keeping the compaction, min/max commit, cleaner commit
retained configurations to low values in MOR.
**To Reproduce**
This is the code snippet. We are using Glue 4.0 and Hudi version 0.12.1.
(Tried using EC2 as well)
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.session import SparkSession
args = getResolvedOptions(sys.argv, ['JOB_NAME','schema_evolution_testing'])
spark =
SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').config('spark.sql.hive.convertMetastoreParquet','false').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
def main():
# HUDI configuration
commonOptions = {
'hoodie.upsert.shuffle.parallelism': 200,
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.compact.inline': 'true',
'hoodie.compact.inline.max.delta.commits': 5,
'hoodie.keep.max.commits': 5,
'hoodie.keep.min.commits': 4,
'hoodie.datasource.write.precombine.field': 'cdc_timestamp',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 3,
'hoodie.datasource.write.keygenerator.class':
'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
}
tableConfig = {
"hoodie.table.name": 'emp',
"hoodie.datasource.write.recordkey.field": 'id',
'hoodie.datasource.write.table.name': 'emp'
}
CombinedConfig = {**tableConfig, **commonOptions}
# Reading Parquet files from raw S3 DMS output
df = spark.read.parquet('s3://bucket/folder/emp/')
print("df schema is:",df.schema.simpleString())
df.show()
# Writing into hudi
(
df.write.format("org.apache.hudi")
.options(**CombinedConfig)
.mode("append")
.save(f"s3://hudi_bucket/hudi_folder/emp")
)
print("Upsertion into HUDI completed")
main()
job.commit()
These are the steps we followed for testing this issue -
**Column Type Changed :**
run 1 -
creation and insertion of 4000 records in hudi table emp with schema -
struct<Op:string,cdc_timestamp:string,id:int,name:string>. This job was
successful and it created a parquet file in hudi table
run 2 -
Altered emp table and added column mgr_id integer. Updated 4000 records. New
schema is -
struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:int>. This job
was successful and it created a .log file in Hudi table.
run 3 -
Altered emp table changed type of mgr_id column from integer to character
varying and updated 3000 records & inserted 5 records of mgr_id. New schema is
- struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:string>. This
job was successful and it created another .log file and another parquet file in
Hudi table.
run 4 -
Updated one record in DB. This job was successful and it created another
.log file.
run 5 -
Updated one record in DB. This job failed with this ERROR- An error occurred
while calling o120.save. Found int, expecting union
Error log -
File "/tmp/schema_evolution_testing.py", line 51, in main
.save(hudi_table_path)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 968, in save
self._jwrite.save(path)
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line
1321, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
190, in deco
return f(*a, **kw)
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o120.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 41.0
(TID 185) (172.34.225.30 executor 4):
org.apache.hudi.exception.HoodieException: Exception when reading log file
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
at
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
at
org.apache.hudi.table.action.compact.HoodieCompactor.compact(HoodieCompactor.java:198)
at
org.apache.hudi.table.action.compact.HoodieCompactor.lambda$compact$57154431$1(HoodieCompactor.java:138)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:455)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:480)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:486)
at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.avro.AvroTypeException: Found int, expecting union
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:207)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock$RecordIterator.next(HoodieAvroDataBlock.java:144)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processDataBlock(AbstractHoodieLogRecordReader.java:382)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:464)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
**Column dropped :**
run 1 -
creation and insertion of 35 records in hudi table emp with schema -
struct<Op:string,cdc_timestamp:string,id:int,name:string,mgr_id:string>. This
job was successful and it created a parquet file in hudi table bucket.
run 2 -
Altered emp table and dropped column mgr_id. Updated 35 records. New schema
is - struct<Op:string,cdc_timestamp:string,id:int,name:string>. This job
created 4 new parquet files in hudi table bucket and then failed with this
error - An error occurred while calling o120.save. Parquet/Avro schema
mismatch: Avro field 'mgr_id' not found.
Error log -
File "/tmp/schema_evolution_testing.py", line 51, in main
.save(hudi_table_path)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 968, in save
self._jwrite.save(path)
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line
1321, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
190, in deco
return f(*a, **kw)
File
"/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o120.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0
(TID 120) (172.36.30.113 executor 5):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :0
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
at
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:80)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
Caused by: org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
at
org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
Caused by: java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Caused by: org.apache.hudi.exception.HoodieException: unable to read next
record from parquet file
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema
mismatch: Avro field 'mgr_id' not found
at
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
at
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:185)
at
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
This script has same behavior on AWS EC2 as well where our production code
is running for same HUDI Version 0.12.1.
**Environment Description**
* Hudi version : 0.12.1
* Spark version : 3.3
* Storage (HDFS/S3/GCS..) : S3
* Platform : Glue 4.0 and EC2
* Running on Docker? (yes/no) : No
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]