[ 
https://issues.apache.org/jira/browse/SPARK-26114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Zhemzhitsky updated SPARK-26114:
---------------------------------------
    Description: 
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 100000) 
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024)))) 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.

Here is dominator tree from the heapdump
 !run1-noparams-dominator-tree.png|width=700!

4 instances of ExternalSorter, although there are only 2 concurrently running 
tasks per executor.
 !run1-noparams-dominator-tree-externalsorter.png|width=700! 

And paths to GC root of the already stopped ExternalSorter.
 !run1-noparams-dominator-tree-externalsorter-gc-root.png|width=700! 

  was:
Trying to use _coalesce_ after shuffle-oriented transformations leads to 
OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
GB of Y GB physical memory used. Consider 
boostingspark.yarn.executor.memoryOverhead_.
Discussion is 
[here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].

The error happens when trying specify pretty small number of partitions in 
_coalesce_ call.

*How to reproduce?*

# Start spark-shell
{code:bash}
spark-shell \ 
  --num-executors=5 \ 
  --executor-cores=2 \ 
  --master=yarn \
  --deploy-mode=client \ 
  --conf spark.executor.memory=1g \ 
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
{code}
Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
memory usage seems to be the only way to control the amount of memory used for 
shuffle data transferring by now.
Also note that the total number of cores allocated for job is 5x2=10
# Then generate some test data
{code:scala}
import org.apache.hadoop.io._ 
import org.apache.hadoop.io.compress._ 
import org.apache.commons.lang._ 
import org.apache.spark._ 

// generate 100M records of sample data 
sc.makeRDD(1 to 1000, 1000) 
  .flatMap(item => (1 to 100000) 
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024)))) 
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
{code}
# Run the sample job
{code:scala}
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd 
  .map(item => item._1.toString -> item._2.toString) 
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
  .coalesce(10,false) 
  .count 
{code}
Note that the number of partitions is equal to the total number of cores 
allocated to the job.

Here is dominator tree from the heapdump
 !run1-noparams-dominator-tree.png|width=700!

4 instances of ExternalSorter, although there are only 2 concurrently running 
tasks per executor.
 !run1-noparams-dominator-tree-externalsorter.png|width=700! 

And paths to GC root of the already stopped ExternalSorter.
 !run1-noparams-dominator-tree-externalsorter-gc-root.png|width=700! 


> Memory leak of PartitionedPairBuffer when coalescing after 
> repartitionAndSortWithinPartitions
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26114
>                 URL: https://issues.apache.org/jira/browse/SPARK-26114
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.2, 2.3.2, 2.4.0
>         Environment: Spark 3.0.0-SNAPSHOT (master branch)
> Scala 2.11
> Yarn 2.7
>            Reporter: Sergey Zhemzhitsky
>            Priority: Major
>         Attachments: run1-noparams-dominator-tree-externalsorter-gc-root.png, 
> run1-noparams-dominator-tree-externalsorter.png, 
> run1-noparams-dominator-tree.png
>
>
> Trying to use _coalesce_ after shuffle-oriented transformations leads to 
> OutOfMemoryErrors or _Container killed by YARN for exceeding memory limits. X 
> GB of Y GB physical memory used. Consider 
> boostingspark.yarn.executor.memoryOverhead_.
> Discussion is 
> [here|http://apache-spark-developers-list.1001551.n3.nabble.com/Coalesce-behaviour-td25289.html].
> The error happens when trying specify pretty small number of partitions in 
> _coalesce_ call.
> *How to reproduce?*
> # Start spark-shell
> {code:bash}
> spark-shell \ 
>   --num-executors=5 \ 
>   --executor-cores=2 \ 
>   --master=yarn \
>   --deploy-mode=client \ 
>   --conf spark.executor.memoryOverhead=512 \
>   --conf spark.executor.memory=1g \ 
>   --conf spark.dynamicAllocation.enabled=false \
>   --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
> -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
> {code}
> Please note using _-Dio.netty.noUnsafe=true_ property. Preventing off-heap 
> memory usage seems to be the only way to control the amount of memory used 
> for shuffle data transferring by now.
> Also note that the total number of cores allocated for job is 5x2=10
> # Then generate some test data
> {code:scala}
> import org.apache.hadoop.io._ 
> import org.apache.hadoop.io.compress._ 
> import org.apache.commons.lang._ 
> import org.apache.spark._ 
> // generate 100M records of sample data 
> sc.makeRDD(1 to 1000, 1000) 
>   .flatMap(item => (1 to 100000) 
>     .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) 
> -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) 
>   .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
> {code}
> # Run the sample job
> {code:scala}
> import org.apache.hadoop.io._
> import org.apache.spark._
> import org.apache.spark.storage._
> val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
> rdd 
>   .map(item => item._1.toString -> item._2.toString) 
>   .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
>   .coalesce(10,false) 
>   .count 
> {code}
> Note that the number of partitions is equal to the total number of cores 
> allocated to the job.
> Here is dominator tree from the heapdump
>  !run1-noparams-dominator-tree.png|width=700!
> 4 instances of ExternalSorter, although there are only 2 concurrently running 
> tasks per executor.
>  !run1-noparams-dominator-tree-externalsorter.png|width=700! 
> And paths to GC root of the already stopped ExternalSorter.
>  !run1-noparams-dominator-tree-externalsorter-gc-root.png|width=700! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to