[ 
https://issues.apache.org/jira/browse/SPARK-41187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-41187.
-----------------------------------------
    Fix Version/s: 3.3.2
                   3.4.0
       Resolution: Fixed

> [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-41187
>                 URL: https://issues.apache.org/jira/browse/SPARK-41187
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.1.2, 3.2.0, 3.1.3, 3.3.1
>            Reporter: wineternity
>            Assignee: wineternity
>            Priority: Major
>             Fix For: 3.3.2, 3.4.0
>
>         Attachments: image-2022-11-18-10-57-49-230.png, 
> image-2022-11-18-11-01-57-435.png, image-2022-11-18-11-09-34-760.png, 
> image-20221113232214179.png, image-20221113232233952.png
>
>
> We have a long running thriftserver, which we found memory leak happened. One 
> of the memory leak is like below.  !image-2022-11-18-10-57-49-230.png!
> The event queue size in our prod env is set to very large to avoid message 
> drop, but we still find the message drop in log. And the event processing 
> time is very long , event is accumulated in queue.
> In heap dump we found LiveExecutor  instances number is also become very 
> huge. After check the heap dump, Finally we found the reason. 
> !image-2022-11-18-11-01-57-435.png!
> The reason is:
> For a shuffle map stage tasks, if a executor lost happen,  the finished task 
> will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" 
> in TaskSetManager.scala, this will cause the  activeTask in 
> AppStatusListner's liveStage become negative
> {code:java}
> override def executorLost(execId: String, host: String, reason: 
> ExecutorLossReason): Unit = {
>   // Re-enqueue any tasks that ran on the failed executor if this is a 
> shuffle map stage,
>   // and we are not using an external shuffle server which could serve the 
> shuffle outputs.
>   // The reason is the next stage wouldn't be able to fetch the data from 
> this dead executor
>   // so we would need to rerun these tasks on other executors.
>   if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && 
> !isZombie) {
>     for ((tid, info) <- taskInfos if info.executorId == execId) {
>       val index = info.index
>       // We may have a running task whose partition has been marked as 
> successful,
>       // this partition has another task completed in another stage attempt.
>       // We treat it as a running task and will call handleFailedTask later.
>       if (successful(index) && !info.running && 
> !killedByOtherAttempt.contains(tid)) {
>         successful(index) = false
>         copiesRunning(index) -= 1
>         tasksSuccessful -= 1
>         addPendingTask(index)
>         // Tell the DAGScheduler that this task was resubmitted so that it 
> doesn't think our
>         // stage finishes when a total of tasks.size tasks finish.
>         sched.dagScheduler.taskEnded(
>           tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
>       }
>     }
>   }{code}
>  
> !image-2022-11-18-11-09-34-760.png!
> if liveStage activeTask is negative, it will never be removed, thus cause the 
> executor moved to deadExecutors will never to removed, cause it need to check 
> there is no stage submission less than its remove time before removed. 
> {code:java}
> /** Was the specified executor active for any currently live stages? */
> private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = {
>   liveStages.values.asScala.exists { stage =>
>     stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime
>   }
> } 
> override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
>   .....
>   // remove any dead executors that were not running for any currently active 
> stages
>   deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))
> }{code}
>  
> Add the corresponding logs in prod env as attachment. The resubmitted task 
> number is equals to the activeTasks in heap dump for that stage.
> !image-20221113232214179.png!
> !image-20221113232233952.png!
> Hope I describe it clear, I will create a pull request later,  we just ignore 
> the resubmitted message in AppStatusListener to fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to