[
https://issues.apache.org/jira/browse/SPARK-45537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-45537:
-----------------------------------
Labels: pull-request-available (was: )
> The last Task may get stuck in multi resource-profile
> -----------------------------------------------------
>
> Key: SPARK-45537
> URL: https://issues.apache.org/jira/browse/SPARK-45537
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 4.0.0
> Reporter: liangyongyuan
> Priority: Major
> Labels: pull-request-available
>
> Description:
> In scenarios involving multiple resource profiles (e.g., prof1 and prof2),
> when a taskset (prof1) has only one remaining task (task0) awaiting
> scheduling, and there are executors (executor0 with prof0 and executor1 with
> prof1), if executor1 fails to run task0, executor1 gets blacklisted.
> Consequently, task0 becomes unschedulable, leading to a blockage in the task
> scheduling process.
> Example Code:
> {code:java}
> val rprof = new ResourceProfileBuilder()
> val ereqs = new ExecutorResourceRequests()
> ereqs.memory("4g")
> ereqs.memoryOverhead("2g").offHeapMemory("1g")
> val resourceProfile = rprof.require(ereqs).build()
> val rdd = sc.parallelize(1 to 10, 1).withResources(resourceProfile)
> rdd.map(num => {
> if (TaskContext.get().attemptNumber() == 0) {
> throw new RuntimeException("First attempt encounters an error")
> } else {
> num / 2
> }
> }).collect()
> {code}
> Issue:
> The issue arises when the taskSet becomes unschedulable. The logic attempts
> to find a task that cannot be scheduled on any executor across all profiles.
> However, when determining whether an executor can schedule the task, there is
> no distinction made based on the resource profile. This leads to an incorrect
> assumption that executor0 (prof0) can schedule the task, which is not the
> case.
> Relevant Code:
> {code:java}
> if (!launchedAnyTask) {
> taskSet.getCompletelyExcludedTaskIfAny(hostToExecutors) ........
> }
> def getCompletelyExcludedTaskIfAny(
> hostToExecutors: HashMap[String, HashSet[String]]): Option[Int] = {
> ........
> pendingTask.find { indexInTaskSet =>
> // try to find some executor this task can run on.
> // It's possible that some *other* task isn't schedulable anywhere,
> // but we will discover that in some later call, when that unschedulable
> task is the last task remaining.
> hostToExecutors.forall { case (host, execsOnHost) =>
> // Check if the task can run on the node
> val nodeExcluded =
> appHealthTracker.isNodeExcluded(host) ||
> taskSetExcludelist.isNodeExcludedForTaskSet(host) ||
> taskSetExcludelist.isNodeExcludedForTask(host, indexInTaskSet)
> if (nodeExcluded) {
> true
> } else {
> // Check if the task can run on any of the executors
> execsOnHost.forall { exec =>
> appHealthTracker.isExecutorExcluded(exec) ||
> taskSetExcludelist.isExecutorExcludedForTaskSet(exec) ||
> taskSetExcludelist.isExecutorExcludedForTask(exec, indexInTaskSet)
> }
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]