Repository: spark Updated Branches: refs/heads/master c9729187b -> c42ef9533
[SPARK-21456][MESOS] Make the driver failover_timeout configurable ## What changes were proposed in this pull request? Current behavior: in Mesos cluster mode, the driver failover_timeout is set to zero. If the driver temporarily loses connectivity with the Mesos master, the framework will be torn down and all executors killed. Proposed change: make the failover_timeout configurable via a new option, spark.mesos.driver.failoverTimeout. The default value is still zero. Note: with non-zero failover_timeout, an explicit teardown is needed in some cases. This is captured in https://issues.apache.org/jira/browse/SPARK-21458 ## How was this patch tested? Added a unit test to make sure the config option is set while creating the scheduler driver. Ran an integration test with mesosphere/spark showing that with a non-zero failover_timeout the Spark job finishes after a driver is disconnected from the master. Author: Susan X. Huynh <[email protected]> Closes #18674 from susanxhuynh/sh-mesos-failover-timeout. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c42ef953 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c42ef953 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c42ef953 Branch: refs/heads/master Commit: c42ef953343073a50ef04c5ce848b574ff7f2238 Parents: c972918 Author: Susan X. Huynh <[email protected]> Authored: Wed Jul 19 15:11:06 2017 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Wed Jul 19 15:11:06 2017 -0700 ---------------------------------------------------------------------- docs/running-on-mesos.md | 11 ++++++ .../org/apache/spark/deploy/mesos/config.scala | 9 ++++- .../MesosCoarseGrainedSchedulerBackend.scala | 3 +- ...esosCoarseGrainedSchedulerBackendSuite.scala | 36 ++++++++++++++++++++ 4 files changed, 57 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 7401b63..cf257c0 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config Fetcher Cache</a> </td> </tr> +<tr> + <td><code>spark.mesos.driver.failoverTimeout</code></td> + <td><code>0.0</code></td> + <td> + The amount of time (in seconds) that the master will wait for the + driver to reconnect, after being temporarily disconnected, before + it tears down the driver framework by killing all its + executors. The default value is zero, meaning no timeout: if the + driver disconnects, the master immediately tears down the framework. + </td> +</tr> </table> # Troubleshooting and Debugging http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 56d697f..6c8619e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -58,9 +58,16 @@ package object config { private [spark] val DRIVER_LABELS = ConfigBuilder("spark.mesos.driver.labels") - .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" + + .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " + "pairs should be separated by a colon, and commas used to list more than one." + "Ex. key:value,key2:value2") .stringConf .createOptional + + private [spark] val DRIVER_FAILOVER_TIMEOUT = + ConfigBuilder("spark.mesos.driver.failoverTimeout") + .doc("Amount of time in seconds that the master will wait to hear from the driver, " + + "during a temporary disconnection, before tearing down all the executors.") + .doubleConf + .createWithDefault(0.0) } http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 7dd42c4..6e7f41d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), None, - None, + Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)), sc.conf.getOption("spark.mesos.driver.frameworkId") ) http://git-wip-us.apache.org/repos/asf/spark/blob/c42ef953/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 7cca5fe..d9ff4a4 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} @@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("failover timeout is set in created scheduler driver") { + val failoverTimeoutIn = 3600.0 + initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString)) + sc = new SparkContext(sparkConf) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(failoverTimeout.isDefined) + assert(failoverTimeout.get.equals(failoverTimeoutIn)) + driver + } + } + + backend.start() + } + test("honors unset spark.mesos.containerizer") { setBackend(Map("spark.mesos.executor.docker.image" -> "test")) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
