[ 
https://issues.apache.org/jira/browse/SPARK-35391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Graves reassigned SPARK-35391:
-------------------------------------

    Assignee: Vasily Kolpakov

> Memory leak in ExecutorAllocationListener breaks dynamic allocation under 
> high load
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-35391
>                 URL: https://issues.apache.org/jira/browse/SPARK-35391
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.1.1
>            Reporter: Vasily Kolpakov
>            Assignee: Vasily Kolpakov
>            Priority: Major
>             Fix For: 3.2.0, 3.1.3
>
>
> ExecutorAllocationListener doesn't clean up data properly. 
> ExecutorAllocationListener performs progressively slower and eventually fails 
> to process events in time.
> There are two problems:
>  * a bug (typo?) in totalRunningTasksPerResourceProfile() method
>  getOrElseUpdate() is used instead of getOrElse().
>  If spark-dynamic-executor-allocation thread calls schedule() after a 
> SparkListenerTaskEnd event for the last task in a stage
>  but before SparkListenerStageCompleted event for the stage, then 
> stageAttemptToNumRunningTask will not be cleaned up properly.
>  * resourceProfileIdToStageAttempt clean-up is broken
>  If a SparkListenerTaskEnd event for the last task in a stage was processed 
> before SparkListenerStageCompleted for that stage,
>  then resourceProfileIdToStageAttempt will not be cleaned up properly.
>  
> Bugs were introduced in this commit: 
> https://github.com/apache/spark/commit/496f6ac86001d284cbfb7488a63dd3a168919c0f
>  .
> Steps to reproduce:
>  # Launch standalone master and worker with 
> 'spark.shuffle.service.enabled=true'
>  # Run spark-shell with --conf 'spark.shuffle.service.enabled=true' --conf 
> 'spark.dynamicAllocation.enabled=true' and paste this script
> {code:java}
> for (_ <- 0 until 10) {
>     Seq(1, 2, 3, 4, 5).toDF.repartition(100).agg("value" -> "sum").show()
> }
> {code}
>  # make a heap dump and examine 
> ExecutorAllocationListener.totalRunningTasksPerResourceProfile and 
> ExecutorAllocationListener.resourceProfileIdToStageAttempt fields
> Expected: totalRunningTasksPerResourceProfile and 
> resourceProfileIdToStageAttempt(defaultResourceProfileId) are empty
> Actual: totalRunningTasksPerResourceProfile and 
> resourceProfileIdToStageAttempt(defaultResourceProfileId) contain 
> non-relevant data
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to