Igor Berman created SPARK-23423: ----------------------------------- Summary: Application declines any offers when killed+active executors rich spark.dynamicAllocation.maxExecutors Key: SPARK-23423 URL: https://issues.apache.org/jira/browse/SPARK-23423 Project: Spark Issue Type: Bug Components: Mesos, Spark Core Affects Versions: 2.2.1 Reporter: Igor Berman
Hi I've noticed rather strange behavior of MesosCoarseGrainedSchedulerBackend when running on Mesos with dynamic allocation on and limiting number of max executors by spark.dynamicAllocation.maxExecutors. Suppose we have long running driver that has cyclic pattern of resource consumption(with some idle times in between), due to dyn.allocation it receives offers and then releases them after current chunk of work processed. Since at [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L573] the backend compares numExecutors < executorLimit and numExecutors is defined as slaves.values.map(_.taskIDs.size).sum and slaves holds all executors ever "met", i.e. both active and killed (see comment [https://github.com/apache/spark/blob/master/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L122)] it means that after a while, when number of killed executors might be greater than maxExecutors, the application will decline any offer, thus stopping to work I've created test that "reproduces" this behavior, not sure how good it is: {code:java} //MesosCoarseGrainedSchedulerBackendSuite test("max executors registered stops to accept offers when dynamic allocation enabled") { setBackend(Map( "spark.dynamicAllocation.maxExecutors" -> "1", "spark.dynamicAllocation.enabled" -> "true", "spark.dynamicAllocation.testing" -> "true")) backend.doRequestTotalExecutors(1) val (mem, cpu) = (backend.executorMemory(sc), 4) val offer1 = createOffer("o1", "s1", mem, cpu) backend.resourceOffers(driver, List(offer1).asJava) verifyTaskLaunched(driver, "o1") backend.doKillExecutors(List("0")) verify(driver, times(1)).killTask(createTaskId("0")) val offer2 = createOffer("o2", "s2", mem, cpu) backend.resourceOffers(driver, List(offer2).asJava) verify(driver, times(1)).declineOffer(offer2.getId) }{code} Workaround: Don't set maxExecutors with dynamicAllocation on I'm not sure how to solve this problem since it seems that it's not trivial to change numExecutors in this scenario to count only active executors(since this information is not available in Slave class. On the other hand, might be that this behavior is "normal" and expected. Please advice Igor marking you friends since you were last to touch this piece of code and probably can advice something([~vanzin], [~skonto], [~susanxhuynh]) -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org