[ 
https://issues.apache.org/jira/browse/SPARK-44679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17774963#comment-17774963
 ] 

huangsheng commented on SPARK-44679:
------------------------------------

I think your problem is the same as [#SPARK-40622]

> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> -----------------------------------------------------------------
>
>                 Key: SPARK-44679
>                 URL: https://issues.apache.org/jira/browse/SPARK-44679
>             Project: Spark
>          Issue Type: Bug
>          Components: EC2, PySpark
>    Affects Versions: 3.2.1
>         Environment: We use Amazon EMR to run Pyspark jobs.
> Amazon EMR version : emr-6.7.0
> Installed applications : 
> Tez 0.9.2, Spark 3.2.1, Hive 3.1.3, Sqoop 1.4.7, Hadoop 3.2.1, Zookeeper 
> 3.5.7, HCatalog 3.1.3, Livy 0.7.1
>            Reporter: Haitham Eltaweel
>            Priority: Major
>         Attachments: code_sample.txt
>
>
> We get the following error from our Pyspark application in Production env:
> _java.lang.OutOfMemoryError: Requested array size exceeds VM limit_
> I simplified the code we used and shared it below so you can easily 
> investigate the issue.
> We use Pyspark to read 900 MB text file which has one record. We use foreach 
> function to iterate over the Datafreme and apply some high order function. 
> The error occurs once foreach action is triggered. I think the issue is 
> related to the integer data type of the bytes array used to hold the 
> serialized dataframe. Since the dataframe record was too big, it seems the 
> serialized record exceeded the max integer value, hence the error occurred. 
> Note that the same error happens when using foreachBatch function with 
> writeStream. 
> Our prod data has many records larger than 100 MB.  Appreciate your help to 
> provide a fix or a solution to that issue.
>  
> *Find below the code snippet:*
> from pyspark.sql import SparkSession,functions as f
>  
> def check_file_name(row):
>     print("check_file_name called")
>  
> def main():
>     spark=SparkSession.builder.enableHiveSupport().getOrCreate()
> inputPath = "s3://bucket-name/common/source/"
>     inputDF = spark.read.text(inputPath, wholetext=True)
>     inputDF = inputDF.select(f.date_format(f.current_timestamp(), 
> 'yyyyMMddHH').astype('string').alias('insert_hr'),
>                         f.col("value").alias("raw_data"),
>                         f.input_file_name().alias("input_file_name"))
>     inputDF.foreach(check_file_name)
>  
> if __name__ == "__main__":
>     main()
> *Find below spark-submit command used:*
> spark-submit --master yarn --conf 
> spark.serializer=org.apache.spark.serializer.KryoSerializer  --num-executors 
> 15 --executor-cores 4 --executor-memory 20g --driver-memory 20g --name 
> haitham_job --deploy-mode cluster big_file_process.py



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to