[ 
https://issues.apache.org/jira/browse/SPARK-23423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364861#comment-16364861
 ] 

Stavros Kontopoulos edited comment on SPARK-23423 at 2/14/18 10:17 PM:
-----------------------------------------------------------------------

Hi [~igor.berman]. Looking at the code again I think when there is a status 
update tasksIds of dead tasks are removed:

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L732]

Slaves are not removed but task Ids are, maybe something else is not working. 
Do you have a log at the time of the issue to attach?

The test you have is ok but I suspect it does not trigger deletion for the 
tasks in the case of a failure. i will check it.

Btw the behavior for checking the upper limit of the num of the executors you 
are referring to was added here: spark-16944.


was (Author: skonto):
Hi [~igor.berman]. Looking at the code again I think when there is a status 
update tasksIds of dead tasks are removed:

[https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L732]

Slaves are not removed but task Ids are, maybe something else is not working. 
Do you have a log at the time of the issue to attach?

The test you have is ok but does not trigger deletion for the tasks in the case 
of a failure.

Btw the behavior for checking the upper limit of the num of the executors you 
are referring to was added here: spark-16944.

> Application declines any offers when killed+active executors rich 
> spark.dynamicAllocation.maxExecutors
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-23423
>                 URL: https://issues.apache.org/jira/browse/SPARK-23423
>             Project: Spark
>          Issue Type: Bug
>          Components: Mesos, Spark Core
>    Affects Versions: 2.2.1
>            Reporter: Igor Berman
>            Priority: Major
>
> Hi
> I've noticed rather strange behavior of MesosCoarseGrainedSchedulerBackend 
> when running on Mesos with dynamic allocation on and limiting number of max 
> executors by spark.dynamicAllocation.maxExecutors.
> Suppose we have long running driver that has cyclic pattern of resource 
> consumption(with some idle times in between), due to dyn.allocation it 
> receives offers and then releases them after current chunk of work processed.
> Since at 
> [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573]
>  the backend compares numExecutors < executorLimit and 
> numExecutors is defined as slaves.values.map(_.taskIDs.size).sum and slaves 
> holds all slaves ever "met", i.e. both active and killed (see comment 
> [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L122)]
>  
> On the other hand, number of taskIds should be updated due to statusUpdate, 
> but suppose this update is lost(actually I don't see logs of 'is now 
> TASK_KILLED') so this number of executors might be wrong
>  
> I've created test that "reproduces" this behavior, not sure how good it is:
> {code:java}
> //MesosCoarseGrainedSchedulerBackendSuite
> test("max executors registered stops to accept offers when dynamic allocation 
> enabled") {
>   setBackend(Map(
>     "spark.dynamicAllocation.maxExecutors" -> "1",
>     "spark.dynamicAllocation.enabled" -> "true",
>     "spark.dynamicAllocation.testing" -> "true"))
>   backend.doRequestTotalExecutors(1)
>   val (mem, cpu) = (backend.executorMemory(sc), 4)
>   val offer1 = createOffer("o1", "s1", mem, cpu)
>   backend.resourceOffers(driver, List(offer1).asJava)
>   verifyTaskLaunched(driver, "o1")
>   backend.doKillExecutors(List("0"))
>   verify(driver, times(1)).killTask(createTaskId("0"))
>   val offer2 = createOffer("o2", "s2", mem, cpu)
>   backend.resourceOffers(driver, List(offer2).asJava)
>   verify(driver, times(1)).declineOffer(offer2.getId)
> }{code}
>  
>  
> Workaround: Don't set maxExecutors with dynamicAllocation on
>  
> Please advice
> Igor
> marking you friends since you were last to touch this piece of code and 
> probably can advice something([~vanzin], [~skonto], [~susanxhuynh])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to