[ https://issues.apache.org/jira/browse/SPARK-44679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17774963#comment-17774963 ]
huangsheng commented on SPARK-44679: ------------------------------------ I think your problem is the same as [#SPARK-40622] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > ----------------------------------------------------------------- > > Key: SPARK-44679 > URL: https://issues.apache.org/jira/browse/SPARK-44679 > Project: Spark > Issue Type: Bug > Components: EC2, PySpark > Affects Versions: 3.2.1 > Environment: We use Amazon EMR to run Pyspark jobs. > Amazon EMR version : emr-6.7.0 > Installed applications : > Tez 0.9.2, Spark 3.2.1, Hive 3.1.3, Sqoop 1.4.7, Hadoop 3.2.1, Zookeeper > 3.5.7, HCatalog 3.1.3, Livy 0.7.1 > Reporter: Haitham Eltaweel > Priority: Major > Attachments: code_sample.txt > > > We get the following error from our Pyspark application in Production env: > _java.lang.OutOfMemoryError: Requested array size exceeds VM limit_ > I simplified the code we used and shared it below so you can easily > investigate the issue. > We use Pyspark to read 900 MB text file which has one record. We use foreach > function to iterate over the Datafreme and apply some high order function. > The error occurs once foreach action is triggered. I think the issue is > related to the integer data type of the bytes array used to hold the > serialized dataframe. Since the dataframe record was too big, it seems the > serialized record exceeded the max integer value, hence the error occurred. > Note that the same error happens when using foreachBatch function with > writeStream. > Our prod data has many records larger than 100 MB. Appreciate your help to > provide a fix or a solution to that issue. > > *Find below the code snippet:* > from pyspark.sql import SparkSession,functions as f > > def check_file_name(row): > print("check_file_name called") > > def main(): > spark=SparkSession.builder.enableHiveSupport().getOrCreate() > inputPath = "s3://bucket-name/common/source/" > inputDF = spark.read.text(inputPath, wholetext=True) > inputDF = inputDF.select(f.date_format(f.current_timestamp(), > 'yyyyMMddHH').astype('string').alias('insert_hr'), > f.col("value").alias("raw_data"), > f.input_file_name().alias("input_file_name")) > inputDF.foreach(check_file_name) > > if __name__ == "__main__": > main() > *Find below spark-submit command used:* > spark-submit --master yarn --conf > spark.serializer=org.apache.spark.serializer.KryoSerializer --num-executors > 15 --executor-cores 4 --executor-memory 20g --driver-memory 20g --name > haitham_job --deploy-mode cluster big_file_process.py -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org