[ https://issues.apache.org/jira/browse/SPARK-41187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638495#comment-17638495 ]
wineternity edited comment on SPARK-41187 at 11/25/22 5:18 AM: --------------------------------------------------------------- Add the corresponding logs in prod env as attachment. The resubmitted task number is equals to the activeTasks in heap dump for that stage. was (Author: yimo_yym): Add the correpsonding logs in prod env: > [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen > ---------------------------------------------------------------------------- > > Key: SPARK-41187 > URL: https://issues.apache.org/jira/browse/SPARK-41187 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.1.2, 3.2.0, 3.1.3, 3.3.1 > Reporter: wineternity > Priority: Major > Attachments: image-2022-11-18-10-57-49-230.png, > image-2022-11-18-11-01-57-435.png, image-2022-11-18-11-09-34-760.png, > image-20221113232214179.png, image-20221113232233952.png > > > We have a long running thriftserver, which we found memory leak happened. One > of the memory leak is like below. !image-2022-11-18-10-57-49-230.png! > The event queue size in our prod env is set to very large to avoid message > drop, but we still find the message drop in log. And the event processing > time is very long , event is accumulated in queue. > In heap dump we found LiveExecutor instances number is also become very > huge. After check the heap dump, Finally we found the reason. > !image-2022-11-18-11-01-57-435.png! > The reason is: > For a shuffle map stage tasks, if a executor lost happen, the finished task > will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" > in TaskSetManager.scala, this will cause the activeTask in > AppStatusListner's liveStage become negative > {code:java} > override def executorLost(execId: String, host: String, reason: > ExecutorLossReason): Unit = { > // Re-enqueue any tasks that ran on the failed executor if this is a > shuffle map stage, > // and we are not using an external shuffle server which could serve the > shuffle outputs. > // The reason is the next stage wouldn't be able to fetch the data from > this dead executor > // so we would need to rerun these tasks on other executors. > if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && > !isZombie) { > for ((tid, info) <- taskInfos if info.executorId == execId) { > val index = info.index > // We may have a running task whose partition has been marked as > successful, > // this partition has another task completed in another stage attempt. > // We treat it as a running task and will call handleFailedTask later. > if (successful(index) && !info.running && > !killedByOtherAttempt.contains(tid)) { > successful(index) = false > copiesRunning(index) -= 1 > tasksSuccessful -= 1 > addPendingTask(index) > // Tell the DAGScheduler that this task was resubmitted so that it > doesn't think our > // stage finishes when a total of tasks.size tasks finish. > sched.dagScheduler.taskEnded( > tasks(index), Resubmitted, null, Seq.empty, Array.empty, info) > } > } > }{code} > > !image-2022-11-18-11-09-34-760.png! > if liveStage activeTask is negative, it will never be removed, thus cause the > executor moved to deadExecutors will never to removed, cause it need to check > there is no stage submission less than its remove time before removed. > {code:java} > /** Was the specified executor active for any currently live stages? */ > private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = { > liveStages.values.asScala.exists { stage => > stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime > } > } > override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { > ..... > // remove any dead executors that were not running for any currently active > stages > deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec)) > }{code} > > Hope I describe it clear, I will create a pull request later, we just ignore > the resubmitted message in AppStatusListener to fix it. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org