[ 
https://issues.apache.org/jira/browse/SPARK-45537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-45537:
-----------------------------------
    Labels: pull-request-available  (was: )

> The last Task may get stuck in multi resource-profile
> -----------------------------------------------------
>
>                 Key: SPARK-45537
>                 URL: https://issues.apache.org/jira/browse/SPARK-45537
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 4.0.0
>            Reporter: liangyongyuan
>            Priority: Major
>              Labels: pull-request-available
>
> Description:
> In scenarios involving multiple resource profiles (e.g., prof1 and prof2), 
> when a taskset (prof1) has only one remaining task (task0) awaiting 
> scheduling, and there are executors (executor0 with prof0 and executor1 with 
> prof1), if executor1 fails to run task0, executor1 gets blacklisted. 
> Consequently, task0 becomes unschedulable, leading to a blockage in the task 
> scheduling process.
> Example Code:
> {code:java}
> val rprof = new ResourceProfileBuilder()
> val ereqs = new ExecutorResourceRequests()
> ereqs.memory("4g")
> ereqs.memoryOverhead("2g").offHeapMemory("1g")
> val resourceProfile = rprof.require(ereqs).build()
> val rdd = sc.parallelize(1 to 10, 1).withResources(resourceProfile)
> rdd.map(num => {
>   if (TaskContext.get().attemptNumber() == 0) {
>     throw new RuntimeException("First attempt encounters an error")
>   } else {
>     num / 2
>   }
> }).collect()
> {code}
> Issue:
> The issue arises when the taskSet becomes unschedulable. The logic attempts 
> to find a task that cannot be scheduled on any executor across all profiles. 
> However, when determining whether an executor can schedule the task, there is 
> no distinction made based on the resource profile. This leads to an incorrect 
> assumption that executor0 (prof0) can schedule the task, which is not the 
> case.
> Relevant Code:
> {code:java}
> if (!launchedAnyTask) {
>   taskSet.getCompletelyExcludedTaskIfAny(hostToExecutors) ........
> }
> def getCompletelyExcludedTaskIfAny(
>       hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
> ........
>   pendingTask.find { indexInTaskSet =>
>     // try to find some executor this task can run on.
>     // It's possible that some *other* task isn't schedulable anywhere,
>     // but we will discover that in some later call, when that unschedulable 
> task is the last task remaining.
>     hostToExecutors.forall { case (host, execsOnHost) =>
>       // Check if the task can run on the node
>       val nodeExcluded =
>         appHealthTracker.isNodeExcluded(host) ||
>           taskSetExcludelist.isNodeExcludedForTaskSet(host) ||
>           taskSetExcludelist.isNodeExcludedForTask(host, indexInTaskSet)
>       if (nodeExcluded) {
>         true
>       } else {
>         // Check if the task can run on any of the executors
>         execsOnHost.forall { exec =>
>           appHealthTracker.isExecutorExcluded(exec) ||
>             taskSetExcludelist.isExecutorExcludedForTaskSet(exec) ||
>             taskSetExcludelist.isExecutorExcludedForTask(exec, indexInTaskSet)
>         }
>       }
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to