[
https://issues.apache.org/jira/browse/SPARK-44679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17774963#comment-17774963
]
huangsheng commented on SPARK-44679:
------------------------------------
I think your problem is the same as [#SPARK-40622]
> java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> -----------------------------------------------------------------
>
> Key: SPARK-44679
> URL: https://issues.apache.org/jira/browse/SPARK-44679
> Project: Spark
> Issue Type: Bug
> Components: EC2, PySpark
> Affects Versions: 3.2.1
> Environment: We use Amazon EMR to run Pyspark jobs.
> Amazon EMR version : emr-6.7.0
> Installed applications :
> Tez 0.9.2, Spark 3.2.1, Hive 3.1.3, Sqoop 1.4.7, Hadoop 3.2.1, Zookeeper
> 3.5.7, HCatalog 3.1.3, Livy 0.7.1
> Reporter: Haitham Eltaweel
> Priority: Major
> Attachments: code_sample.txt
>
>
> We get the following error from our Pyspark application in Production env:
> _java.lang.OutOfMemoryError: Requested array size exceeds VM limit_
> I simplified the code we used and shared it below so you can easily
> investigate the issue.
> We use Pyspark to read 900 MB text file which has one record. We use foreach
> function to iterate over the Datafreme and apply some high order function.
> The error occurs once foreach action is triggered. I think the issue is
> related to the integer data type of the bytes array used to hold the
> serialized dataframe. Since the dataframe record was too big, it seems the
> serialized record exceeded the max integer value, hence the error occurred.
> Note that the same error happens when using foreachBatch function with
> writeStream.
> Our prod data has many records larger than 100 MB. Appreciate your help to
> provide a fix or a solution to that issue.
>
> *Find below the code snippet:*
> from pyspark.sql import SparkSession,functions as f
>
> def check_file_name(row):
> print("check_file_name called")
>
> def main():
> spark=SparkSession.builder.enableHiveSupport().getOrCreate()
> inputPath = "s3://bucket-name/common/source/"
> inputDF = spark.read.text(inputPath, wholetext=True)
> inputDF = inputDF.select(f.date_format(f.current_timestamp(),
> 'yyyyMMddHH').astype('string').alias('insert_hr'),
> f.col("value").alias("raw_data"),
> f.input_file_name().alias("input_file_name"))
> inputDF.foreach(check_file_name)
>
> if __name__ == "__main__":
> main()
> *Find below spark-submit command used:*
> spark-submit --master yarn --conf
> spark.serializer=org.apache.spark.serializer.KryoSerializer --num-executors
> 15 --executor-cores 4 --executor-memory 20g --driver-memory 20g --name
> haitham_job --deploy-mode cluster big_file_process.py
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]