[
https://issues.apache.org/jira/browse/HUDI-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-7412:
------------------------------
Fix Version/s: 1.0.2
> OOM error after upgrade to hudi 0.13 when writing big record (stream or batch
> job)
> ----------------------------------------------------------------------------------
>
> Key: HUDI-7412
> URL: https://issues.apache.org/jira/browse/HUDI-7412
> Project: Apache Hudi
> Issue Type: Bug
> Components: spark
> Environment: Amazon EMR version emr-6.11.1
> Spark version 3.3.2
> Hive version 3.1.3
> Hadoop version 3.3.3
> hudi version 0.13
> Reporter: Haitham Eltaweel
> Priority: Major
> Fix For: 1.0.2
>
> Attachments: image-2024-02-15-11-35-19-156.png
>
>
> After upgrading from hudi 0.11 to hudi 0.13. Big records (larger than 200MB)
> can not be written to the destination location due to OOM error even after
> increasing Spark resources memory.
> Find the error details: java.lang.OutOfMemoryError: Java heap space.
> The error never happened when running same jab using hudi 0.11.
> Find below the use case details:
> Read one json file which has one record of 900MB from S3 source location,
> transform the DF then write the output DF to S3 target location. When using
> upsert hudi operation, the error happens at Tagging job ([mapToPair at
> HoodieJavaRDD.java:135|http://ip-10-18-73-98.ec2.internal:20888/proxy/application_1705084455183_108018/stages/stage/?id=2&attempt=0])
> and when using insert hudi operation, the error happens at Building workload
> profile job. The error happens whether I run the job as Spark structured
> streaming job or batch job.
> Find the batch job code snippet shared below. I obfuscated some values.
>
> from pyspark.sql import functions as f
> from pyspark.sql import SparkSession
> from pyspark.sql.types import *
>
> def main():
>
> hudi_options = {
> 'hoodie.table.name': 'hudi_streaming_reco',
> 'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
> 'hoodie.datasource.write.table.name': 'hudi_streaming_reco',
> 'hoodie.datasource.write.keygenerator.class':
> 'org.apache.hudi.keygen.CustomKeyGenerator',
> 'hoodie.datasource.write.recordkey.field': 'id',
> 'hoodie.datasource.write.precombine.field': 'ts',
> 'hoodie.datasource.write.partitionpath.field': 'insert_hr:SIMPLE',
> 'hoodie.embed.timeline.server': False,
> 'hoodie.index.type': 'SIMPLE',
> 'hoodie.parquet.compression.codec': 'snappy',
> 'hoodie.clean.async': True,
> 'hoodie.parquet.max.file.size': 125829120,
> 'hoodie.parquet.small.file.limit': 104857600,
> 'hoodie.parquet.block.size': 125829120,
> 'hoodie.metadata.enable': True,
> 'hoodie.metadata.validate': True,
> "hoodie.datasource.write.hive_style_partitioning": True,
> 'hoodie.datasource.hive_sync.support_timestamp': True,
> "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://xxxxxx:xxxxx",
> 'hoodie.datasource.hive_sync.username': 'xxxxxxx',
> 'hoodie.datasource.hive_sync.password': 'xxxxxxx',
> "hoodie.datasource.hive_sync.database": "xxxxxxx",
> "hoodie.datasource.hive_sync.table": "hudi_streaming_reco",
> "hoodie.datasource.hive_sync.partition_fields": "insert_hr",
> "hoodie.datasource.hive_sync.enable": True,
> 'hoodie.datasource.hive_sync.partition_extractor_class':
> 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
> }
>
> spark=SparkSession.builder.getOrCreate()
>
> inputPath = "s3://xxxxxxx/"
>
> transfomredDF = (
> spark
> .read
> .text(inputPath, wholetext=True)
> .select(f.date_format(f.current_timestamp(),
> 'yyyyMMddHH').astype('string').alias('insert_hr'),
> f.col("value").alias("raw_data"),
> f.get_json_object(f.col("value"), "$._id").alias("id"),
> f.get_json_object(f.col("value"), "$.metadata.createdDateTime").alias("ts"),
> f.input_file_name().alias("input_file_name"))
> )
>
> s3_output_path = "s3://xxxxxxx/"
> transfomredDF \
> .write.format("hudi") \
> .options(**hudi_options) \
> .option('hoodie.datasource.write.operation', 'upsert') \
> .save(s3_output_path,mode='append')
>
> if __name__ == "__main__":
> main()
>
> Find the spark submit command used :
> spark-submit --master yarn --conf spark.driver.userClassPathFirst=true --conf
> spark.jars.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
> spark.kryoserializer.buffer.max=512 --num-executors 5 --executor-cores 3
> --executor-memory 10g --driver-memory 30g --name big_file_batch --queue
> casualty --deploy-mode cluster big_record_test.py
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)