Haitham Eltaweel created HUDI-7412:
--------------------------------------

             Summary: OOM error after upgrade to hudi 0.13 when writing big 
file (stream or batch job)
                 Key: HUDI-7412
                 URL: https://issues.apache.org/jira/browse/HUDI-7412
             Project: Apache Hudi
          Issue Type: Bug
          Components: spark
         Environment: Amazon EMR version emr-6.11.1
Spark version 3.3.2 
Hive version 3.1.3
Hadoop version 3.3.3
hudi version 0.13
            Reporter: Haitham Eltaweel


After upgrading from hudi 0.11 to hudi 0.13. Big records (larger than 200MB) 
can not be written to the destination location due to OOM error even after 
increasing Spark resources memory.

Find the error details: java.lang.OutOfMemoryError: Java heap space.

The error never happened when running same jab using hudi 0.11.


Find below the use case details:
Read one json file which has one record of 900MB from S3 source location, 
transform the DF then write the output DF to S3 target location. When using 
upsert hudi operation, the error happens at Tagging job ([mapToPair at 
HoodieJavaRDD.java:135|http://ip-10-18-73-98.ec2.internal:20888/proxy/application_1705084455183_108018/stages/stage/?id=2&attempt=0])
 and when using insert hudi operation, the error happens at Building workload 
profile job. The error happens whether I run the job as Spark structured 
streaming job or batch job.


Find the batch job code snippet shared below. I obfuscated some values.



from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
def main():
 
    hudi_options = {
        'hoodie.table.name': 'hudi_streaming_reco',
        'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
        'hoodie.datasource.write.table.name': 'hudi_streaming_reco',
        'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.CustomKeyGenerator',
        'hoodie.datasource.write.recordkey.field': 'id',
        'hoodie.datasource.write.precombine.field': 'ts',
        'hoodie.datasource.write.partitionpath.field': 'insert_hr:SIMPLE',
        'hoodie.embed.timeline.server': False,
        'hoodie.index.type': 'SIMPLE',
        'hoodie.parquet.compression.codec': 'snappy',
        'hoodie.clean.async': True,
        'hoodie.parquet.max.file.size': 125829120,
        'hoodie.parquet.small.file.limit': 104857600,
        'hoodie.parquet.block.size': 125829120,
        'hoodie.metadata.enable': True,
        'hoodie.metadata.validate': True,
        "hoodie.datasource.write.hive_style_partitioning": True,
        'hoodie.datasource.hive_sync.support_timestamp': True,
        "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://xxxxxx:xxxxx",
        'hoodie.datasource.hive_sync.username': 'xxxxxxx',
        'hoodie.datasource.hive_sync.password': 'xxxxxxx',
        "hoodie.datasource.hive_sync.database": "xxxxxxx",
        "hoodie.datasource.hive_sync.table": "hudi_streaming_reco",
        "hoodie.datasource.hive_sync.partition_fields": "insert_hr",
        "hoodie.datasource.hive_sync.enable": True,
        'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor'
    }
 
    spark=SparkSession.builder.getOrCreate()
 
    inputPath = "s3://xxxxxxx/"
 
    transfomredDF = (
        spark
        .read
        .text(inputPath, wholetext=True)
        .select(f.date_format(f.current_timestamp(), 
'yyyyMMddHH').astype('string').alias('insert_hr'),
                    f.col("value").alias("raw_data"),
                    f.get_json_object(f.col("value"), "$._id").alias("id"),
                    f.get_json_object(f.col("value"), 
"$.metadata.createdDateTime").alias("ts"),
                    f.input_file_name().alias("input_file_name"))
    )
 
 
 
    s3_output_path = "s3://xxxxxxx/"
    transfomredDF \
    .write.format("hudi") \
    .options(**hudi_options) \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .save(s3_output_path,mode='append')
 
if __name__ == "__main__":
    main()
 
Find the spark submit command used :
spark-submit --master yarn --conf spark.driver.userClassPathFirst=true --conf 
spark.jars.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.kryoserializer.buffer.max=512 --num-executors 5 --executor-cores 3 
--executor-memory 10g --driver-memory 30g --name big_file_batch --queue 
casualty --deploy-mode cluster big_record_test.py



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to