[ https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sergey Zhemzhitsky updated SPARK-26114: --------------------------------------- Attachment: run1-noparams-dominator-tree.png > Memory leak of PartitionedPairBuffer when coalescing after > repartitionAndSortWithinPartitions > --------------------------------------------------------------------------------------------- > > Key: SPARK-26114 > URL: https://issues.apache.org/jira/browse/SPARK-26114 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.2, 2.3.2, 2.4.0 > Environment: Spark 3.0.0-SNAPSHOT (master branch) > Scala 2.11 > Yarn 2.7 > Reporter: Sergey Zhemzhitsky > Priority: Major > Attachments: run1-noparams-dominator-tree.png > > > Trying to use _coalesce_ after shuffle-oriented transformations leads to > OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X > GB of Y GB physical memory used. Consider > boostingspark.yarn.executor.memoryOverhead_ > The error happens when trying specify pretty small number of partitions in > _coalesce_ call. > *How to reproduce?* > # Start spark-shell > {code:bash} > spark-shell \ > --num-executors=5 \ > --executor-cores=2 \ > --master=yarn \ > --deploy-mode=client \ > --conf spark.executor.memory=1g \ > --conf spark.dynamicAllocation.enabled=false \ > --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' > {code} > Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap > memory usage seems to be the only way to control the amount of memory used > for shuffle data transferring by now. > Also note that the total number of cores allocated for job is 5x2=10 > # Then generate some test data > {code:scala} > import org.apache.hadoop.io._ > import org.apache.hadoop.io.compress._ > import org.apache.commons.lang._ > import org.apache.spark._ > // generate 100M records of sample data > sc.makeRDD(1 to 1000, 1000) > .flatMap(item => (1 to 100000) > .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) > -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) > .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) > {code} > # Run the sample job > {code:scala} > import org.apache.hadoop.io._ > import org.apache.spark._ > import org.apache.spark.storage._ > val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) > rdd > .map(item => item._1.toString -> item._2.toString) > .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) > .coalesce(10,false) > .count > {code} > Note that the number of partitions is equal to the total number of cores > allocated to the job. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org