[
https://issues.apache.org/jira/browse/SPARK-23423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364861#comment-16364861
]
Stavros Kontopoulos edited comment on SPARK-23423 at 2/14/18 11:56 PM:
-----------------------------------------------------------------------
Hi [~igor.berman]. Looking at the code again I think when there is a status
update tasksIds of dead tasks are removed:
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L732]
Slaves are not removed but task Ids are, maybe something else is not working.
Do you have a log at the time of the issue to attach?
The test you have is ok but I think it does not trigger deletion for the tasks
in the case of a failure. I think you need to update the backend with task
status:
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala#L102-L103]
The following test passes:
{{{code:scala}}}
test("max executors registered stops to accept offers when dynamic allocation
enabled")
{ setBackend(Map( "spark.dynamicAllocation.maxExecutors" -> "1",
"spark.dynamicAllocation.enabled" -> "true", "spark.dynamicAllocation.testing"
-> "true")) backend.doRequestTotalExecutors(1) val (mem, cpu) =
(backend.executorMemory(sc), 4) val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava) verifyTaskLaunched(driver,
"o1") backend.doKillExecutors(List("0")) verify(driver,
times(1)).killTask(createTaskId("0")) val status = createTaskStatus("0", "s1",
TaskState.TASK_KILLED) backend.statusUpdate(driver, status) val offer2 =
createOffer("o2", "s2", mem, cpu) backend.resourceOffers(driver,
List(offer2).asJava) // verify(driver, times(1)).declineOffer(offer2.getId) val
taskInfos = verifyTaskLaunched(driver, "o2") assert(taskInfos.length == 1) }
{{{code}}}
Btw the behavior for checking the upper limit of the num of the executors you
are referring to is defined in different places:
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L354]
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573]
The latter exists for very long time. The former was added with Spark-16944.
Essentially they do check the same thing.
was (Author: skonto):
Hi [~igor.berman]. Looking at the code again I think when there is a status
update tasksIds of dead tasks are removed:
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L732]
Slaves are not removed but task Ids are, maybe something else is not working.
Do you have a log at the time of the issue to attach?
The test you have is ok but I think it does not trigger deletion for the tasks
in the case of a failure. I think you need to update the backend with task
status:
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala#L102-L103]
The following test passes:
```
test("max executors registered stops to accept offers when dynamic allocation
enabled") {
setBackend(Map(
"spark.dynamicAllocation.maxExecutors" -> "1",
"spark.dynamicAllocation.enabled" -> "true",
"spark.dynamicAllocation.testing" -> "true"))
backend.doRequestTotalExecutors(1)
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
verifyTaskLaunched(driver, "o1")
backend.doKillExecutors(List("0"))
verify(driver, times(1)).killTask(createTaskId("0"))
val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
backend.statusUpdate(driver, status)
val offer2 = createOffer("o2", "s2", mem, cpu)
backend.resourceOffers(driver, List(offer2).asJava)
// verify(driver, times(1)).declineOffer(offer2.getId)
val taskInfos = verifyTaskLaunched(driver, "o2")
assert(taskInfos.length == 1)
}
```
Btw the behavior for checking the upper limit of the num of the executors you
are referring to is defined in different places:
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L354]
[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573]
The latter exists for very long time. The former was added with Spark-16944.
Essentially they do check the same thing.
> Application declines any offers when killed+active executors rich
> spark.dynamicAllocation.maxExecutors
> ------------------------------------------------------------------------------------------------------
>
> Key: SPARK-23423
> URL: https://issues.apache.org/jira/browse/SPARK-23423
> Project: Spark
> Issue Type: Bug
> Components: Mesos, Spark Core
> Affects Versions: 2.2.1
> Reporter: Igor Berman
> Priority: Major
>
> Hi
> I've noticed rather strange behavior of MesosCoarseGrainedSchedulerBackend
> when running on Mesos with dynamic allocation on and limiting number of max
> executors by spark.dynamicAllocation.maxExecutors.
> Suppose we have long running driver that has cyclic pattern of resource
> consumption(with some idle times in between), due to dyn.allocation it
> receives offers and then releases them after current chunk of work processed.
> Since at
> [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573]
> the backend compares numExecutors < executorLimit and
> numExecutors is defined as slaves.values.map(_.taskIDs.size).sum and slaves
> holds all slaves ever "met", i.e. both active and killed (see comment
> [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L122)]
>
> On the other hand, number of taskIds should be updated due to statusUpdate,
> but suppose this update is lost(actually I don't see logs of 'is now
> TASK_KILLED') so this number of executors might be wrong
>
> I've created test that "reproduces" this behavior, not sure how good it is:
> {code:java}
> //MesosCoarseGrainedSchedulerBackendSuite
> test("max executors registered stops to accept offers when dynamic allocation
> enabled") {
> setBackend(Map(
> "spark.dynamicAllocation.maxExecutors" -> "1",
> "spark.dynamicAllocation.enabled" -> "true",
> "spark.dynamicAllocation.testing" -> "true"))
> backend.doRequestTotalExecutors(1)
> val (mem, cpu) = (backend.executorMemory(sc), 4)
> val offer1 = createOffer("o1", "s1", mem, cpu)
> backend.resourceOffers(driver, List(offer1).asJava)
> verifyTaskLaunched(driver, "o1")
> backend.doKillExecutors(List("0"))
> verify(driver, times(1)).killTask(createTaskId("0"))
> val offer2 = createOffer("o2", "s2", mem, cpu)
> backend.resourceOffers(driver, List(offer2).asJava)
> verify(driver, times(1)).declineOffer(offer2.getId)
> }{code}
>
>
> Workaround: Don't set maxExecutors with dynamicAllocation on
>
> Please advice
> Igor
> marking you friends since you were last to touch this piece of code and
> probably can advice something([~vanzin], [~skonto], [~susanxhuynh])
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]