hudi-bot opened a new issue, #16396:
URL: https://github.com/apache/hudi/issues/16396

   After upgrading from hudi 0.11 to hudi 0.13. Big records (larger than 200MB) 
can not be written to the destination location due to OOM error even after 
increasing Spark resources memory.
   
   Find the error details: java.lang.OutOfMemoryError: Java heap space.
   
   The error never happened when running same jab using hudi 0.11.
   
   Find below the use case details:
   Read one json file which has one record of 900MB from S3 source location, 
transform the DF then write the output DF to S3 target location. When using 
upsert hudi operation, the error happens at Tagging job ([mapToPair at 
HoodieJavaRDD.java:135|http://ip-10-18-73-98.ec2.internal:20888/proxy/application_1705084455183_108018/stages/stage/?id=2&attempt=0])
 and when using insert hudi operation, the error happens at Building workload 
profile job. The error happens whether I run the job as Spark structured 
streaming job or batch job.
   
   Find the batch job code snippet shared below. I obfuscated some values.
   
    
   from pyspark.sql import functions as f
   from pyspark.sql import SparkSession
   from pyspark.sql.types import *
    
   def main():
    
   hudi_options = {
   'hoodie.table.name': 'hudi_streaming_reco',
   'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
   'hoodie.datasource.write.table.name': 'hudi_streaming_reco',
   'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.CustomKeyGenerator',
   'hoodie.datasource.write.recordkey.field': 'id',
   'hoodie.datasource.write.precombine.field': 'ts',
   'hoodie.datasource.write.partitionpath.field': 'insert_hr:SIMPLE',
   'hoodie.embed.timeline.server': False,
   'hoodie.index.type': 'SIMPLE',
   'hoodie.parquet.compression.codec': 'snappy',
   'hoodie.clean.async': True,
   'hoodie.parquet.max.file.size': 125829120,
   'hoodie.parquet.small.file.limit': 104857600,
   'hoodie.parquet.block.size': 125829120,
   'hoodie.metadata.enable': True,
   'hoodie.metadata.validate': True,
   "hoodie.datasource.write.hive_style_partitioning": True,
   'hoodie.datasource.hive_sync.support_timestamp': True,
   "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://xxxxxx:xxxxx",
   'hoodie.datasource.hive_sync.username': 'xxxxxxx',
   'hoodie.datasource.hive_sync.password': 'xxxxxxx',
   "hoodie.datasource.hive_sync.database": "xxxxxxx",
   "hoodie.datasource.hive_sync.table": "hudi_streaming_reco",
   "hoodie.datasource.hive_sync.partition_fields": "insert_hr",
   "hoodie.datasource.hive_sync.enable": True,
   'hoodie.datasource.hive_sync.partition_extractor_class': 
'org.apache.hudi.hive.MultiPartKeysValueExtractor'
   }
    
   spark=SparkSession.builder.getOrCreate()
    
   inputPath = "s3://xxxxxxx/"
    
   transfomredDF = (
   spark
   .read
   .text(inputPath, wholetext=True)
   .select(f.date_format(f.current_timestamp(), 
'yyyyMMddHH').astype('string').alias('insert_hr'),
   f.col("value").alias("raw_data"),
   f.get_json_object(f.col("value"), "$._id").alias("id"),
   f.get_json_object(f.col("value"), "$.metadata.createdDateTime").alias("ts"),
   f.input_file_name().alias("input_file_name"))
   )
    
   s3_output_path = "s3://xxxxxxx/"
   transfomredDF \
   .write.format("hudi") \
   .options(**hudi_options) \
   .option('hoodie.datasource.write.operation', 'upsert') \
   .save(s3_output_path,mode='append')
    
   if __name__ == "__main__":
         main()
    
   
   Find the spark submit command used :
   spark-submit --master yarn --conf spark.driver.userClassPathFirst=true 
--conf 
spark.jars.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.kryoserializer.buffer.max=512 --num-executors 5 --executor-cores 3 
--executor-memory 10g --driver-memory 30g --name big_file_batch --queue 
casualty --deploy-mode cluster big_record_test.py
   
    
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-7412
   - Type: Bug
   - Fix version(s):
     - 1.1.0
   - Attachment(s):
     - 15/Feb/24 
17:35;haitham;image-2024-02-15-11-35-19-156.png;https://issues.apache.org/jira/secure/attachment/13066751/image-2024-02-15-11-35-19-156.png
   
   
   ---
   
   
   ## Comments
   
   15/Feb/24 17:35;haitham;Update: the same error (OOM) also occurs when 
writing the DF using parquet format. Find a snapshot from Spark UI below: 
   !image-2024-02-15-11-35-19-156.png!;;;


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to