http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala deleted file mode 100644 index ec47ab1..0000000 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import scala.collection.JavaConverters._ -import scala.language.reflectiveCalls - -import org.apache.mesos.Protos.{Resource, Value} -import org.mockito.Mockito._ -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.internal.config._ - -class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { - - // scalastyle:off structural.type - // this is the documented way of generating fixtures in scalatest - def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new { - val sparkConf = new SparkConf - val sc = mock[SparkContext] - when(sc.conf).thenReturn(sparkConf) - } - - private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = { - val rangeValue = Value.Range.newBuilder() - rangeValue.setBegin(range._1) - rangeValue.setEnd(range._2) - val builder = Resource.newBuilder() - .setName("ports") - .setType(Value.Type.RANGES) - .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) - - role.foreach { r => builder.setRole(r) } - builder.build() - } - - private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = { - resources.flatMap{resource => resource.getRanges.getRangeList - .asScala.map(range => (range.getBegin, range.getEnd))} - } - - def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)]) - : Boolean = { - array1.sortBy(identity).deep == array2.sortBy(identity).deep - } - - def arePortsEqual(array1: Array[Long], array2: Array[Long]) - : Boolean = { - array1.sortBy(identity).deep == array2.sortBy(identity).deep - } - - def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = { - resources.flatMap{ resource => - resource.getRanges.getRangeList.asScala.toList.map{ - range => (range.getBegin, range.getEnd)}} - } - - val utils = new MesosSchedulerUtils { } - // scalastyle:on structural.type - - test("use at-least minimum overhead") { - val f = fixture - when(f.sc.executorMemory).thenReturn(512) - utils.executorMemory(f.sc) shouldBe 896 - } - - test("use overhead if it is greater than minimum value") { - val f = fixture - when(f.sc.executorMemory).thenReturn(4096) - utils.executorMemory(f.sc) shouldBe 4505 - } - - test("use spark.mesos.executor.memoryOverhead (if set)") { - val f = fixture - when(f.sc.executorMemory).thenReturn(1024) - f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") - utils.executorMemory(f.sc) shouldBe 1536 - } - - test("parse a non-empty constraint string correctly") { - val expectedMap = Map( - "os" -> Set("centos7"), - "zone" -> Set("us-east-1a", "us-east-1b") - ) - utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap) - } - - test("parse an empty constraint string correctly") { - utils.parseConstraintString("") shouldBe Map() - } - - test("throw an exception when the input is malformed") { - an[IllegalArgumentException] should be thrownBy - utils.parseConstraintString("os;zone:us-east") - } - - test("empty values for attributes' constraints matches all values") { - val constraintsStr = "os:" - val parsedConstraints = utils.parseConstraintString(constraintsStr) - - parsedConstraints shouldBe Map("os" -> Set()) - - val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build() - val noOsOffer = Map("zone" -> zoneSet) - val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build()) - val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build()) - - utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false - utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true - utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true - } - - test("subset match is performed for set attributes") { - val supersetConstraint = Map( - "os" -> Value.Text.newBuilder().setValue("ubuntu").build(), - "zone" -> Value.Set.newBuilder() - .addItem("us-east-1a") - .addItem("us-east-1b") - .addItem("us-east-1c") - .build()) - - val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c" - val parsedConstraints = utils.parseConstraintString(zoneConstraintStr) - - utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true - } - - test("less than equal match is performed on scalar attributes") { - val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build()) - - val ltConstraint = utils.parseConstraintString("gpus:2") - val eqConstraint = utils.parseConstraintString("gpus:3") - val gtConstraint = utils.parseConstraintString("gpus:4") - - utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true - utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true - utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false - } - - test("contains match is performed for range attributes") { - val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build()) - val ltConstraint = utils.parseConstraintString("ports:6000") - val eqConstraint = utils.parseConstraintString("ports:7500") - val gtConstraint = utils.parseConstraintString("ports:8002") - val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300") - - utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false - utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true - utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false - utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true - } - - test("equality match is performed for text attributes") { - val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build()) - - val trueConstraint = utils.parseConstraintString("os:centos7") - val falseConstraint = utils.parseConstraintString("os:ubuntu") - - utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true - utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false - } - - test("Port reservation is done correctly with user specified ports only") { - val conf = new SparkConf() - conf.set("spark.executor.port", "3000" ) - conf.set(BLOCK_MANAGER_PORT, 4000) - val portResource = createTestPortResource((3000, 5000), Some("my_role")) - - val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(3000, 4000), List(portResource)) - resourcesToBeUsed.length shouldBe 2 - - val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray - - portsToUse.length shouldBe 2 - arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true - - val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) - - val expectedUSed = Array((3000L, 3000L), (4000L, 4000L)) - - arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true - } - - test("Port reservation is done correctly with some user specified ports (spark.executor.port)") { - val conf = new SparkConf() - conf.set("spark.executor.port", "3100" ) - val portResource = createTestPortResource((3000, 5000), Some("my_role")) - - val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(3100), List(portResource)) - - val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - - portsToUse.length shouldBe 1 - portsToUse.contains(3100) shouldBe true - } - - test("Port reservation is done correctly with all random ports") { - val conf = new SparkConf() - val portResource = createTestPortResource((3000L, 5000L), Some("my_role")) - - val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(), List(portResource)) - val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - - portsToUse.isEmpty shouldBe true - } - - test("Port reservation is done correctly with user specified ports only - multiple ranges") { - val conf = new SparkConf() - conf.set("spark.executor.port", "2100" ) - conf.set("spark.blockManager.port", "4000") - val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), - createTestPortResource((2000, 2500), Some("other_role"))) - val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(2100, 4000), portResourceList) - val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - - portsToUse.length shouldBe 2 - val portsRangesLeft = rangesResourcesToTuple(resourcesLeft) - val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) - - val expectedUsed = Array((2100L, 2100L), (4000L, 4000L)) - - arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true - arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true - } - - test("Port reservation is done correctly with all random ports - multiple ranges") { - val conf = new SparkConf() - val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), - createTestPortResource((2000, 2500), Some("other_role"))) - val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(), portResourceList) - val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - portsToUse.isEmpty shouldBe true - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala deleted file mode 100644 index 5a81bb3..0000000 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.nio.ByteBuffer - -import org.apache.spark.SparkFunSuite - -class MesosTaskLaunchDataSuite extends SparkFunSuite { - test("serialize and deserialize data must be same") { - val serializedTask = ByteBuffer.allocate(40) - (Range(100, 110).map(serializedTask.putInt(_))) - serializedTask.rewind - val attemptNumber = 100 - val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString - serializedTask.rewind - val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString) - assert(mesosTaskLaunchData.attemptNumber == attemptNumber) - assert(mesosTaskLaunchData.serializedTask.equals(serializedTask)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala deleted file mode 100644 index 7ebb294..0000000 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.util.Collections - -import scala.collection.JavaConverters._ - -import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} -import org.apache.mesos.SchedulerDriver -import org.mockito.{ArgumentCaptor, Matchers} -import org.mockito.Mockito._ - -object Utils { - def createOffer( - offerId: String, - slaveId: String, - mem: Int, - cpus: Int, - ports: Option[(Long, Long)] = None, - gpus: Int = 0): Offer = { - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(mem)) - builder.addResourcesBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(cpus)) - ports.foreach { resourcePorts => - builder.addResourcesBuilder() - .setName("ports") - .setType(Value.Type.RANGES) - .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() - .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build())) - } - if (gpus > 0) { - builder.addResourcesBuilder() - .setName("gpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(gpus)) - } - builder.setId(createOfferId(offerId)) - .setFrameworkId(FrameworkID.newBuilder() - .setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) - .setHostname(s"host${slaveId}") - .build() - } - - def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = { - val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]]) - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(createOfferId(offerId))), - captor.capture()) - captor.getValue.asScala.toList - } - - def createOfferId(offerId: String): OfferID = { - OfferID.newBuilder().setValue(offerId).build() - } - - def createSlaveId(slaveId: String): SlaveID = { - SlaveID.newBuilder().setValue(slaveId).build() - } - - def createExecutorId(executorId: String): ExecutorID = { - ExecutorID.newBuilder().setValue(executorId).build() - } - - def createTaskId(taskId: String): TaskID = { - TaskID.newBuilder().setValue(taskId).build() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2f61d33..5e7dc15 100644 --- a/pom.xml +++ b/pom.xml @@ -2580,7 +2580,7 @@ <profile> <id>yarn</id> <modules> - <module>yarn</module> + <module>resource-managers/yarn</module> <module>common/network-yarn</module> </modules> </profile> @@ -2588,7 +2588,7 @@ <profile> <id>mesos</id> <modules> - <module>mesos</module> + <module>resource-managers/mesos</module> </modules> </profile> http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/pom.xml ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml new file mode 100644 index 0000000..c0a8f9a --- /dev/null +++ b/resource-managers/mesos/pom.xml @@ -0,0 +1,109 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>spark-mesos_2.11</artifactId> + <packaging>jar</packaging> + <name>Spark Project Mesos</name> + <properties> + <sbt.project.name>mesos</sbt.project.name> + <mesos.version>1.0.0</mesos.version> + <mesos.classifier>shaded-protobuf</mesos.classifier> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.mesos</groupId> + <artifactId>mesos</artifactId> + <version>${mesos.version}</version> + <classifier>${mesos.classifier}</classifier> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + + <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-plus</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlets</artifactId> + </dependency> + <!-- End of shaded deps. --> + + </dependencies> + + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 0000000..12b6d5b --- /dev/null +++ b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.cluster.mesos.MesosClusterManager http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala new file mode 100644 index 0000000..792ade8 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.util.concurrent.CountDownLatch + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.mesos.ui.MesosClusterUI +import org.apache.spark.deploy.rest.mesos.MesosRestServer +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.cluster.mesos._ +import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, Utils} + +/* + * A dispatcher that is responsible for managing and launching drivers, and is intended to be + * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in + * the cluster independently of Spark applications. + * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a + * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master + * for resources. + * + * A typical new driver lifecycle is the following: + * - Driver submitted via spark-submit talking to the [[MesosRestServer]] + * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]] + * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue + * + * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable + * per driver launched. + * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as + * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and + * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively. + */ +private[mesos] class MesosClusterDispatcher( + args: MesosClusterDispatcherArguments, + conf: SparkConf) + extends Logging { + + private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host) + private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase() + logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode) + + private val engineFactory = recoveryMode match { + case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory + case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf) + case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode) + } + + private val scheduler = new MesosClusterScheduler(engineFactory, conf) + + private val server = new MesosRestServer(args.host, args.port, conf, scheduler) + private val webUi = new MesosClusterUI( + new SecurityManager(conf), + args.webUiPort, + conf, + publicAddress, + scheduler) + + private val shutdownLatch = new CountDownLatch(1) + + def start(): Unit = { + webUi.bind() + scheduler.frameworkUrl = conf.get(DISPATCHER_WEBUI_URL).getOrElse(webUi.activeWebUiUrl) + scheduler.start() + server.start() + } + + def awaitShutdown(): Unit = { + shutdownLatch.await() + } + + def stop(): Unit = { + webUi.stop() + server.stop() + scheduler.stop() + shutdownLatch.countDown() + } +} + +private[mesos] object MesosClusterDispatcher + extends Logging + with CommandLineUtils { + + override def main(args: Array[String]) { + Utils.initDaemon(log) + val conf = new SparkConf + val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) + conf.setMaster(dispatcherArgs.masterUrl) + conf.setAppName(dispatcherArgs.name) + dispatcherArgs.zookeeperUrl.foreach { z => + conf.set(RECOVERY_MODE, "ZOOKEEPER") + conf.set(ZOOKEEPER_URL, z) + } + val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) + dispatcher.start() + logDebug("Adding shutdown hook") // force eager creation of logger + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutdown hook is shutting down dispatcher") + dispatcher.stop() + dispatcher.awaitShutdown() + } + dispatcher.awaitShutdown() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala new file mode 100644 index 0000000..ef08502 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import scala.annotation.tailrec +import scala.collection.mutable + +import org.apache.spark.util.{IntParam, Utils} +import org.apache.spark.SparkConf + +private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) { + var host: String = Utils.localHostName() + var port: Int = 7077 + var name: String = "Spark Cluster" + var webUiPort: Int = 8081 + var verbose: Boolean = false + var masterUrl: String = _ + var zookeeperUrl: Option[String] = None + var propertiesFile: String = _ + val confProperties: mutable.HashMap[String, String] = + new mutable.HashMap[String, String]() + + parse(args.toList) + + // scalastyle:on println + propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) + Utils.updateSparkConfigFromProperties(conf, confProperties) + + // scalastyle:off println + if (verbose) { + MesosClusterDispatcher.printStream.println(s"Using host: $host") + MesosClusterDispatcher.printStream.println(s"Using port: $port") + MesosClusterDispatcher.printStream.println(s"Using webUiPort: $webUiPort") + MesosClusterDispatcher.printStream.println(s"Framework Name: $name") + + Option(propertiesFile).foreach { file => + MesosClusterDispatcher.printStream.println(s"Using properties file: $file") + } + + MesosClusterDispatcher.printStream.println(s"Spark Config properties set:") + conf.getAll.foreach(println) + } + + @tailrec + private def parse(args: List[String]): Unit = args match { + case ("--host" | "-h") :: value :: tail => + Utils.checkHost(value, "Please use hostname " + value) + host = value + parse(tail) + + case ("--port" | "-p") :: IntParam(value) :: tail => + port = value + parse(tail) + + case ("--webui-port") :: IntParam(value) :: tail => + webUiPort = value + parse(tail) + + case ("--zk" | "-z") :: value :: tail => + zookeeperUrl = Some(value) + parse(tail) + + case ("--master" | "-m") :: value :: tail => + if (!value.startsWith("mesos://")) { + // scalastyle:off println + MesosClusterDispatcher.printStream + .println("Cluster dispatcher only supports mesos (uri begins with mesos://)") + // scalastyle:on println + MesosClusterDispatcher.exitFn(1) + } + masterUrl = value.stripPrefix("mesos://") + parse(tail) + + case ("--name") :: value :: tail => + name = value + parse(tail) + + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + + case ("--conf") :: value :: tail => + val pair = MesosClusterDispatcher. + parseSparkConfProperty(value) + confProperties(pair._1) = pair._2 + parse(tail) + + case ("--help") :: tail => + printUsageAndExit(0) + + case ("--verbose") :: tail => + verbose = true + parse(tail) + + case Nil => + if (Option(masterUrl).isEmpty) { + // scalastyle:off println + MesosClusterDispatcher.printStream.println("--master is required") + // scalastyle:on println + printUsageAndExit(1) + } + + case value => + // scalastyle:off println + MesosClusterDispatcher.printStream.println(s"Unrecognized option: '${value.head}'") + // scalastyle:on println + printUsageAndExit(1) + } + + private def printUsageAndExit(exitCode: Int): Unit = { + val outStream = MesosClusterDispatcher.printStream + + // scalastyle:off println + outStream.println( + "Usage: MesosClusterDispatcher [options]\n" + + "\n" + + "Options:\n" + + " -h HOST, --host HOST Hostname to listen on\n" + + " --help Show this help message and exit.\n" + + " --verbose, Print additional debug output.\n" + + " -p PORT, --port PORT Port to listen on (default: 7077)\n" + + " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" + + " --name NAME Framework name to show in Mesos UI\n" + + " -m --master MASTER URI for connecting to Mesos master\n" + + " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" + + " Zookeeper for persistence\n" + + " --properties-file FILE Path to a custom Spark properties file.\n" + + " Default is conf/spark-defaults.conf \n" + + " --conf PROP=VALUE Arbitrary Spark configuration property.\n" + + " Takes precedence over defined properties in properties-file.") + // scalastyle:on println + MesosClusterDispatcher.exitFn(exitCode) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala new file mode 100644 index 0000000..d4c7022 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.util.Date + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.Command +import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState + +/** + * Describes a Spark driver that is submitted from the + * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by + * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. + * @param jarUrl URL to the application jar + * @param mem Amount of memory for the driver + * @param cores Number of cores for the driver + * @param supervise Supervise the driver for long running app + * @param command The command to launch the driver. + * @param schedulerProperties Extra properties to pass the Mesos scheduler + */ +private[spark] class MesosDriverDescription( + val name: String, + val jarUrl: String, + val mem: Int, + val cores: Double, + val supervise: Boolean, + val command: Command, + schedulerProperties: Map[String, String], + val submissionId: String, + val submissionDate: Date, + val retryState: Option[MesosClusterRetryState] = None) + extends Serializable { + + val conf = new SparkConf(false) + schedulerProperties.foreach {case (k, v) => conf.set(k, v)} + + def copy( + name: String = name, + jarUrl: String = jarUrl, + mem: Int = mem, + cores: Double = cores, + supervise: Boolean = supervise, + command: Command = command, + schedulerProperties: SparkConf = conf, + submissionId: String = submissionId, + submissionDate: Date = submissionDate, + retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = { + + new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap, + submissionId, submissionDate, retryState) + } + + override def toString: String = s"MesosDriverDescription (${command.mainClass})" +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala new file mode 100644 index 0000000..859aa83 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.nio.ByteBuffer +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage +import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} +import org.apache.spark.network.util.TransportConf +import org.apache.spark.util.ThreadUtils + +/** + * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. + * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. + */ +private[mesos] class MesosExternalShuffleBlockHandler( + transportConf: TransportConf, + cleanerIntervalS: Long) + extends ExternalShuffleBlockHandler(transportConf, null) with Logging { + + ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") + .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) + + // Stores a map of app id to app state (timeout value and last heartbeat) + private val connectedApps = new ConcurrentHashMap[String, AppState]() + + protected override def handleMessage( + message: BlockTransferMessage, + client: TransportClient, + callback: RpcResponseCallback): Unit = { + message match { + case RegisterDriverParam(appId, appState) => + val address = client.getSocketAddress + val timeout = appState.heartbeatTimeout + logInfo(s"Received registration request from app $appId (remote address $address, " + + s"heartbeat timeout $timeout ms).") + if (connectedApps.containsKey(appId)) { + logWarning(s"Received a registration request from app $appId, but it was already " + + s"registered") + } + connectedApps.put(appId, appState) + callback.onSuccess(ByteBuffer.allocate(0)) + case Heartbeat(appId) => + val address = client.getSocketAddress + Option(connectedApps.get(appId)) match { + case Some(existingAppState) => + logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + + s"address $address).") + existingAppState.lastHeartbeat = System.nanoTime() + case None => + logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + + s"address $address, appId '$appId').") + } + case _ => super.handleMessage(message, client, callback) + } + } + + /** An extractor object for matching [[RegisterDriver]] message. */ + private object RegisterDriverParam { + def unapply(r: RegisterDriver): Option[(String, AppState)] = + Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) + } + + private object Heartbeat { + def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) + } + + private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) + + private class CleanerThread extends Runnable { + override def run(): Unit = { + val now = System.nanoTime() + connectedApps.asScala.foreach { case (appId, appState) => + if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { + logInfo(s"Application $appId timed out. Removing shuffle files.") + connectedApps.remove(appId) + applicationRemoved(appId, true) + } + } + } + } +} + +/** + * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers + * to associate with. This allows the shuffle service to detect when a driver is terminated + * and can clean up the associated shuffle files. + */ +private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager) + extends ExternalShuffleService(conf, securityManager) { + + protected override def newShuffleBlockHandler( + conf: TransportConf): ExternalShuffleBlockHandler = { + val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S) + new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) + } +} + +private[spark] object MesosExternalShuffleService extends Logging { + + def main(args: Array[String]): Unit = { + ExternalShuffleService.main(args, + (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm)) + } +} + + http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala new file mode 100644 index 0000000..19e2533 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.config.ConfigBuilder + +package object config { + + /* Common app configuration. */ + + private[spark] val SHUFFLE_CLEANER_INTERVAL_S = + ConfigBuilder("spark.shuffle.cleaner.interval") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("30s") + + private[spark] val RECOVERY_MODE = + ConfigBuilder("spark.deploy.recoveryMode") + .stringConf + .createWithDefault("NONE") + + private[spark] val DISPATCHER_WEBUI_URL = + ConfigBuilder("spark.mesos.dispatcher.webui.url") + .doc("Set the Spark Mesos dispatcher webui_url for interacting with the " + + "framework. If unset it will point to Spark's internal web UI.") + .stringConf + .createOptional + + private[spark] val ZOOKEEPER_URL = + ConfigBuilder("spark.deploy.zookeeper.url") + .doc("When `spark.deploy.recoveryMode` is set to ZOOKEEPER, this " + + "configuration is used to set the zookeeper URL to connect to.") + .stringConf + .createOptional + + private[spark] val HISTORY_SERVER_URL = + ConfigBuilder("spark.mesos.dispatcher.historyServer.url") + .doc("Set the URL of the history server. The dispatcher will then " + + "link each driver to its entry in the history server.") + .stringConf + .createOptional + +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala new file mode 100644 index 0000000..cd98110 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState} +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") { + + override def render(request: HttpServletRequest): Seq[Node] = { + val driverId = request.getParameter("id") + require(driverId != null && driverId.nonEmpty, "Missing id parameter") + + val state = parent.scheduler.getDriverState(driverId) + if (state.isEmpty) { + val content = + <div> + <p>Cannot find driver {driverId}</p> + </div> + return UIUtils.basicSparkPage(content, s"Details for Job $driverId") + } + val driverState = state.get + val driverHeaders = Seq("Driver property", "Value") + val schedulerHeaders = Seq("Scheduler property", "Value") + val commandEnvHeaders = Seq("Command environment variable", "Value") + val launchedHeaders = Seq("Launched property", "Value") + val commandHeaders = Seq("Command property", "Value") + val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") + val driverDescription = Iterable.apply(driverState.description) + val submissionState = Iterable.apply(driverState.submissionState) + val command = Iterable.apply(driverState.description.command) + val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap) + val commandEnv = Iterable.apply(driverState.description.command.environment) + val driverTable = + UIUtils.listingTable(driverHeaders, driverRow, driverDescription) + val commandTable = + UIUtils.listingTable(commandHeaders, commandRow, command) + val commandEnvTable = + UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv) + val schedulerTable = + UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties) + val launchedTable = + UIUtils.listingTable(launchedHeaders, launchedRow, submissionState) + val retryTable = + UIUtils.listingTable( + retryHeaders, retryRow, Iterable.apply(driverState.description.retryState)) + val content = + <p>Driver state information for driver id {driverId}</p> + <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a> + <div class="row-fluid"> + <div class="span12"> + <h4>Driver state: {driverState.state}</h4> + <h4>Driver properties</h4> + {driverTable} + <h4>Driver command</h4> + {commandTable} + <h4>Driver command environment</h4> + {commandEnvTable} + <h4>Scheduler properties</h4> + {schedulerTable} + <h4>Launched state</h4> + {launchedTable} + <h4>Retry state</h4> + {retryTable} + </div> + </div>; + + UIUtils.basicSparkPage(content, s"Details for Job $driverId") + } + + private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = { + submissionState.map { state => + <tr> + <td>Mesos Slave ID</td> + <td>{state.slaveId.getValue}</td> + </tr> + <tr> + <td>Mesos Task ID</td> + <td>{state.taskId.getValue}</td> + </tr> + <tr> + <td>Launch Time</td> + <td>{state.startDate}</td> + </tr> + <tr> + <td>Finish Time</td> + <td>{state.finishDate.map(_.toString).getOrElse("")}</td> + </tr> + <tr> + <td>Last Task Status</td> + <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> + </tr> + }.getOrElse(Seq[Node]()) + } + + private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { + properties.map { case (k, v) => + <tr> + <td>{k}</td><td>{v}</td> + </tr> + }.toSeq + } + + private def commandRow(command: Command): Seq[Node] = { + <tr> + <td>Main class</td><td>{command.mainClass}</td> + </tr> + <tr> + <td>Arguments</td><td>{command.arguments.mkString(" ")}</td> + </tr> + <tr> + <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td> + </tr> + <tr> + <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td> + </tr> + <tr> + <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td> + </tr> + } + + private def driverRow(driver: MesosDriverDescription): Seq[Node] = { + <tr> + <td>Name</td><td>{driver.name}</td> + </tr> + <tr> + <td>Id</td><td>{driver.submissionId}</td> + </tr> + <tr> + <td>Cores</td><td>{driver.cores}</td> + </tr> + <tr> + <td>Memory</td><td>{driver.mem}</td> + </tr> + <tr> + <td>Submitted</td><td>{driver.submissionDate}</td> + </tr> + <tr> + <td>Supervise</td><td>{driver.supervise}</td> + </tr> + } + + private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = { + retryState.map { state => + <tr> + <td> + {state.lastFailureStatus} + </td> + <td> + {state.nextRetry} + </td> + <td> + {state.retries} + </td> + </tr> + }.getOrElse(Seq[Node]()) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala new file mode 100644 index 0000000..13ba7d3 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.mesos.Protos.TaskStatus + +import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { + private val historyServerURL = parent.conf.get(HISTORY_SERVER_URL) + + def render(request: HttpServletRequest): Seq[Node] = { + val state = parent.scheduler.getSchedulerState() + + val driverHeader = Seq("Driver ID") + val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil) + val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources") + + val queuedHeaders = driverHeader ++ submissionHeader + val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++ + Seq("Start Date", "Mesos Slave ID", "State") + val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++ + Seq("Last Failed Status", "Next Retry Time", "Attempt Count") + val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers) + val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers) + val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers) + val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers) + val content = + <p>Mesos Framework ID: {state.frameworkId}</p> + <div class="row-fluid"> + <div class="span12"> + <h4>Queued Drivers:</h4> + {queuedTable} + <h4>Launched Drivers:</h4> + {launchedTable} + <h4>Finished Drivers:</h4> + {finishedTable} + <h4>Supervise drivers waiting for retry:</h4> + {retryTable} + </div> + </div>; + UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster") + } + + private def queuedRow(submission: MesosDriverDescription): Seq[Node] = { + val id = submission.submissionId + <tr> + <td><a href={s"driver?id=$id"}>{id}</a></td> + <td>{submission.submissionDate}</td> + <td>{submission.command.mainClass}</td> + <td>cpus: {submission.cores}, mem: {submission.mem}</td> + </tr> + } + + private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { + val id = state.driverDescription.submissionId + + val historyCol = if (historyServerURL.isDefined) { + <td> + <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}> + {state.frameworkId} + </a> + </td> + } else Nil + + <tr> + <td><a href={s"driver?id=$id"}>{id}</a></td> + {historyCol} + <td>{state.driverDescription.submissionDate}</td> + <td>{state.driverDescription.command.mainClass}</td> + <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td> + <td>{state.startDate}</td> + <td>{state.slaveId.getValue}</td> + <td>{stateString(state.mesosTaskStatus)}</td> + </tr> + } + + private def retryRow(submission: MesosDriverDescription): Seq[Node] = { + val id = submission.submissionId + <tr> + <td><a href={s"driver?id=$id"}>{id}</a></td> + <td>{submission.submissionDate}</td> + <td>{submission.command.mainClass}</td> + <td>{submission.retryState.get.lastFailureStatus}</td> + <td>{submission.retryState.get.nextRetry}</td> + <td>{submission.retryState.get.retries}</td> + </tr> + } + + private def stateString(status: Option[TaskStatus]): String = { + if (status.isEmpty) { + return "" + } + val sb = new StringBuilder + val s = status.get + sb.append(s"State: ${s.getState}") + if (status.get.hasMessage) { + sb.append(s", Message: ${s.getMessage}") + } + if (status.get.hasHealthy) { + sb.append(s", Healthy: ${s.getHealthy}") + } + if (status.get.hasSource) { + sb.append(s", Source: ${s.getSource}") + } + if (status.get.hasReason) { + sb.append(s", Reason: ${s.getReason}") + } + if (status.get.hasTimestamp) { + sb.append(s", Time: ${s.getTimestamp}") + } + sb.toString() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala new file mode 100644 index 0000000..6049789 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos.ui + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler +import org.apache.spark.ui.{SparkUI, WebUI} +import org.apache.spark.ui.JettyUtils._ + +/** + * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]] + */ +private[spark] class MesosClusterUI( + securityManager: SecurityManager, + port: Int, + val conf: SparkConf, + dispatcherPublicAddress: String, + val scheduler: MesosClusterScheduler) + extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { + + initialize() + + def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort + + override def initialize() { + attachPage(new MesosClusterPage(this)) + attachPage(new DriverPage(this)) + attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static")) + } +} + +private object MesosClusterUI { + val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala new file mode 100644 index 0000000..ff60b88 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest.mesos + +import java.io.File +import java.text.SimpleDateFormat +import java.util.{Date, Locale} +import java.util.concurrent.atomic.AtomicLong +import javax.servlet.http.HttpServletResponse + +import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.deploy.rest._ +import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler +import org.apache.spark.util.Utils + +/** + * A server that responds to requests submitted by the [[RestSubmissionClient]]. + * All requests are forwarded to + * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. + * This is intended to be used in Mesos cluster mode only. + * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs. + */ +private[spark] class MesosRestServer( + host: String, + requestedPort: Int, + masterConf: SparkConf, + scheduler: MesosClusterScheduler) + extends RestSubmissionServer(host, requestedPort, masterConf) { + + protected override val submitRequestServlet = + new MesosSubmitRequestServlet(scheduler, masterConf) + protected override val killRequestServlet = + new MesosKillRequestServlet(scheduler, masterConf) + protected override val statusRequestServlet = + new MesosStatusRequestServlet(scheduler, masterConf) +} + +private[mesos] class MesosSubmitRequestServlet( + scheduler: MesosClusterScheduler, + conf: SparkConf) + extends SubmitRequestServlet { + + private val DEFAULT_SUPERVISE = false + private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb + private val DEFAULT_CORES = 1.0 + + private val nextDriverNumber = new AtomicLong(0) + // For application IDs + private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) + private def newDriverId(submitDate: Date): String = + f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d" + + /** + * Build a driver description from the fields specified in the submit request. + * + * This involves constructing a command that launches a mesos framework for the job. + * This does not currently consider fields used by python applications since python + * is not supported in mesos cluster mode yet. + */ + private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = { + // Required fields, including the main class because python is not yet supported + val appResource = Option(request.appResource).getOrElse { + throw new SubmitRestMissingFieldException("Application jar is missing.") + } + val mainClass = Option(request.mainClass).getOrElse { + throw new SubmitRestMissingFieldException("Main class is missing.") + } + + // Optional fields + val sparkProperties = request.sparkProperties + val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") + val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") + val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") + val superviseDriver = sparkProperties.get("spark.driver.supervise") + val driverMemory = sparkProperties.get("spark.driver.memory") + val driverCores = sparkProperties.get("spark.driver.cores") + val appArgs = request.appArgs + val environmentVariables = request.environmentVariables + val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) + + // Construct driver description + val conf = new SparkConf(false).setAll(sparkProperties) + val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) + val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) + val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts + val command = new Command( + mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) + val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) + val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) + val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) + val submitDate = new Date() + val submissionId = newDriverId(submitDate) + + new MesosDriverDescription( + name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, + command, request.sparkProperties, submissionId, submitDate) + } + + protected override def handleSubmit( + requestMessageJson: String, + requestMessage: SubmitRestProtocolMessage, + responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { + requestMessage match { + case submitRequest: CreateSubmissionRequest => + val driverDescription = buildDriverDescription(submitRequest) + val s = scheduler.submitDriver(driverDescription) + s.serverSparkVersion = sparkVersion + val unknownFields = findUnknownFields(requestMessageJson, requestMessage) + if (unknownFields.nonEmpty) { + // If there are fields that the server does not know about, warn the client + s.unknownFields = unknownFields + } + s + case unexpected => + responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) + handleError(s"Received message of unexpected type ${unexpected.messageType}.") + } + } +} + +private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) + extends KillRequestServlet { + protected override def handleKill(submissionId: String): KillSubmissionResponse = { + val k = scheduler.killDriver(submissionId) + k.serverSparkVersion = sparkVersion + k + } +} + +private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) + extends StatusRequestServlet { + protected override def handleStatus(submissionId: String): SubmissionStatusResponse = { + val d = scheduler.getDriverStatus(submissionId) + d.serverSparkVersion = sparkVersion + d + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala new file mode 100644 index 0000000..ee9149c --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ + +import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} +import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} +import org.apache.mesos.protobuf.ByteString + +import org.apache.spark.{SparkConf, SparkEnv, TaskState} +import org.apache.spark.TaskState +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData} +import org.apache.spark.util.Utils + +private[spark] class MesosExecutorBackend + extends MesosExecutor + with MesosSchedulerUtils // TODO: fix + with ExecutorBackend + with Logging { + + var executor: Executor = null + var driver: ExecutorDriver = null + + override def statusUpdate(taskId: Long, state: TaskState.TaskState, data: ByteBuffer) { + val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() + driver.sendStatusUpdate(MesosTaskStatus.newBuilder() + .setTaskId(mesosTaskId) + .setState(taskStateToMesos(state)) + .setData(ByteString.copyFrom(data)) + .build()) + } + + override def registered( + driver: ExecutorDriver, + executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, + slaveInfo: SlaveInfo) { + + // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. + val cpusPerTask = executorInfo.getResourcesList.asScala + .find(_.getName == "cpus") + .map(_.getScalar.getValue.toInt) + .getOrElse(0) + val executorId = executorInfo.getExecutorId.getValue + + logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus") + this.driver = driver + // Set a context class loader to be picked up by the serializer. Without this call + // the serializer would default to the null class loader, and fail to find Spark classes + // See SPARK-10986. + Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader) + + val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ + Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) + val conf = new SparkConf(loadDefaults = true).setAll(properties) + val port = conf.getInt("spark.executor.port", 0) + val env = SparkEnv.createExecutorEnv( + conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false) + + executor = new Executor( + executorId, + slaveInfo.getHostname, + env) + } + + override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { + val taskId = taskInfo.getTaskId.getValue.toLong + val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData) + if (executor == null) { + logError("Received launchTask but executor was null") + } else { + SparkHadoopUtil.get.runAsSparkUser { () => + executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber, + taskInfo.getName, taskData.serializedTask) + } + } + } + + override def error(d: ExecutorDriver, message: String) { + logError("Error from Mesos: " + message) + } + + override def killTask(d: ExecutorDriver, t: TaskID) { + if (executor == null) { + logError("Received KillTask but executor was null") + } else { + // TODO: Determine the 'interruptOnCancel' property set for the given job. + executor.killTask(t.getValue.toLong, interruptThread = false) + } + } + + override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} + + override def disconnected(d: ExecutorDriver) {} + + override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} + + override def shutdown(d: ExecutorDriver) {} +} + +/** + * Entry point for Mesos executor. + */ +private[spark] object MesosExecutorBackend extends Logging { + def main(args: Array[String]) { + Utils.initDaemon(log) + // Create a new Executor and start it running + val runner = new MesosExecutorBackend() + new MesosExecutorDriver(runner).run() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala new file mode 100644 index 0000000..ed29b34 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.internal.config._ +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} + +/** + * Cluster Manager for creation of Yarn scheduler and backend + */ +private[spark] class MesosClusterManager extends ExternalClusterManager { + private val MESOS_REGEX = """mesos://(.*)""".r + + override def canCreate(masterURL: String): Boolean = { + masterURL.startsWith("mesos") + } + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + new TaskSchedulerImpl(sc) + } + + override def createSchedulerBackend(sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { + require(!sc.conf.get(IO_ENCRYPTION_ENABLED), + "I/O encryption is currently not supported in Mesos.") + + val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1) + val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) + if (coarse) { + new MesosCoarseGrainedSchedulerBackend( + scheduler.asInstanceOf[TaskSchedulerImpl], + sc, + mesosUrl, + sc.env.securityManager) + } else { + new MesosFineGrainedSchedulerBackend( + scheduler.asInstanceOf[TaskSchedulerImpl], + sc, + mesosUrl) + } + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala new file mode 100644 index 0000000..61ab3e8 --- /dev/null +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import scala.collection.JavaConverters._ + +import org.apache.curator.framework.CuratorFramework +import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.KeeperException.NoNodeException + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkCuratorUtil +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Persistence engine factory that is responsible for creating new persistence engines + * to store Mesos cluster mode state. + */ +private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) { + def createEngine(path: String): MesosClusterPersistenceEngine +} + +/** + * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode + * specific state, so that on failover all the state can be recovered and the scheduler + * can resume managing the drivers. + */ +private[spark] trait MesosClusterPersistenceEngine { + def persist(name: String, obj: Object): Unit + def expunge(name: String): Unit + def fetch[T](name: String): Option[T] + def fetchAll[T](): Iterable[T] +} + +/** + * Zookeeper backed persistence engine factory. + * All Zk engines created from this factory shares the same Zookeeper client, so + * all of them reuses the same connection pool. + */ +private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf) + extends MesosClusterPersistenceEngineFactory(conf) with Logging { + + lazy val zk = SparkCuratorUtil.newClient(conf) + + def createEngine(path: String): MesosClusterPersistenceEngine = { + new ZookeeperMesosClusterPersistenceEngine(path, zk, conf) + } +} + +/** + * Black hole persistence engine factory that creates black hole + * persistence engines, which stores nothing. + */ +private[spark] class BlackHoleMesosClusterPersistenceEngineFactory + extends MesosClusterPersistenceEngineFactory(null) { + def createEngine(path: String): MesosClusterPersistenceEngine = { + new BlackHoleMesosClusterPersistenceEngine + } +} + +/** + * Black hole persistence engine that stores nothing. + */ +private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine { + override def persist(name: String, obj: Object): Unit = {} + override def fetch[T](name: String): Option[T] = None + override def expunge(name: String): Unit = {} + override def fetchAll[T](): Iterable[T] = Iterable.empty[T] +} + +/** + * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state + * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but + * reuses a shared Zookeeper client. + */ +private[spark] class ZookeeperMesosClusterPersistenceEngine( + baseDir: String, + zk: CuratorFramework, + conf: SparkConf) + extends MesosClusterPersistenceEngine with Logging { + private val WORKING_DIR = + conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir + + SparkCuratorUtil.mkdir(zk, WORKING_DIR) + + def path(name: String): String = { + WORKING_DIR + "/" + name + } + + override def expunge(name: String): Unit = { + zk.delete().forPath(path(name)) + } + + override def persist(name: String, obj: Object): Unit = { + val serialized = Utils.serialize(obj) + val zkPath = path(name) + zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized) + } + + override def fetch[T](name: String): Option[T] = { + val zkPath = path(name) + + try { + val fileData = zk.getData().forPath(zkPath) + Some(Utils.deserialize[T](fileData)) + } catch { + case e: NoNodeException => None + case e: Exception => + logWarning("Exception while reading persisted file, deleting", e) + zk.delete().forPath(zkPath) + None + } + } + + override def fetchAll[T](): Iterable[T] = { + zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T]) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
