http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala new file mode 100644 index 0000000..ec47ab1 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import scala.collection.JavaConverters._ +import scala.language.reflectiveCalls + +import org.apache.mesos.Protos.{Resource, Value} +import org.mockito.Mockito._ +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config._ + +class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { + + // scalastyle:off structural.type + // this is the documented way of generating fixtures in scalatest + def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new { + val sparkConf = new SparkConf + val sc = mock[SparkContext] + when(sc.conf).thenReturn(sparkConf) + } + + private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = { + val rangeValue = Value.Range.newBuilder() + rangeValue.setBegin(range._1) + rangeValue.setEnd(range._2) + val builder = Resource.newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) + + role.foreach { r => builder.setRole(r) } + builder.build() + } + + private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{resource => resource.getRanges.getRangeList + .asScala.map(range => (range.getBegin, range.getEnd))} + } + + def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def arePortsEqual(array1: Array[Long], array2: Array[Long]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{ resource => + resource.getRanges.getRangeList.asScala.toList.map{ + range => (range.getBegin, range.getEnd)}} + } + + val utils = new MesosSchedulerUtils { } + // scalastyle:on structural.type + + test("use at-least minimum overhead") { + val f = fixture + when(f.sc.executorMemory).thenReturn(512) + utils.executorMemory(f.sc) shouldBe 896 + } + + test("use overhead if it is greater than minimum value") { + val f = fixture + when(f.sc.executorMemory).thenReturn(4096) + utils.executorMemory(f.sc) shouldBe 4505 + } + + test("use spark.mesos.executor.memoryOverhead (if set)") { + val f = fixture + when(f.sc.executorMemory).thenReturn(1024) + f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") + utils.executorMemory(f.sc) shouldBe 1536 + } + + test("parse a non-empty constraint string correctly") { + val expectedMap = Map( + "os" -> Set("centos7"), + "zone" -> Set("us-east-1a", "us-east-1b") + ) + utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap) + } + + test("parse an empty constraint string correctly") { + utils.parseConstraintString("") shouldBe Map() + } + + test("throw an exception when the input is malformed") { + an[IllegalArgumentException] should be thrownBy + utils.parseConstraintString("os;zone:us-east") + } + + test("empty values for attributes' constraints matches all values") { + val constraintsStr = "os:" + val parsedConstraints = utils.parseConstraintString(constraintsStr) + + parsedConstraints shouldBe Map("os" -> Set()) + + val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build() + val noOsOffer = Map("zone" -> zoneSet) + val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build()) + val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build()) + + utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false + utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true + utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true + } + + test("subset match is performed for set attributes") { + val supersetConstraint = Map( + "os" -> Value.Text.newBuilder().setValue("ubuntu").build(), + "zone" -> Value.Set.newBuilder() + .addItem("us-east-1a") + .addItem("us-east-1b") + .addItem("us-east-1c") + .build()) + + val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c" + val parsedConstraints = utils.parseConstraintString(zoneConstraintStr) + + utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true + } + + test("less than equal match is performed on scalar attributes") { + val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build()) + + val ltConstraint = utils.parseConstraintString("gpus:2") + val eqConstraint = utils.parseConstraintString("gpus:3") + val gtConstraint = utils.parseConstraintString("gpus:4") + + utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false + } + + test("contains match is performed for range attributes") { + val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build()) + val ltConstraint = utils.parseConstraintString("ports:6000") + val eqConstraint = utils.parseConstraintString("ports:7500") + val gtConstraint = utils.parseConstraintString("ports:8002") + val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300") + + utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false + utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false + utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true + } + + test("equality match is performed for text attributes") { + val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build()) + + val trueConstraint = utils.parseConstraintString("os:centos7") + val falseConstraint = utils.parseConstraintString("os:ubuntu") + + utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false + } + + test("Port reservation is done correctly with user specified ports only") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3000" ) + conf.set(BLOCK_MANAGER_PORT, 4000) + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3000, 4000), List(portResource)) + resourcesToBeUsed.length shouldBe 2 + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray + + portsToUse.length shouldBe 2 + arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true + + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUSed = Array((3000L, 3000L), (4000L, 4000L)) + + arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true + } + + test("Port reservation is done correctly with some user specified ports (spark.executor.port)") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3100" ) + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3100), List(portResource)) + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 1 + portsToUse.contains(3100) shouldBe true + } + + test("Port reservation is done correctly with all random ports") { + val conf = new SparkConf() + val portResource = createTestPortResource((3000L, 5000L), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), List(portResource)) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.isEmpty shouldBe true + } + + test("Port reservation is done correctly with user specified ports only - multiple ranges") { + val conf = new SparkConf() + conf.set("spark.executor.port", "2100" ) + conf.set("spark.blockManager.port", "4000") + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(2100, 4000), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 2 + val portsRangesLeft = rangesResourcesToTuple(resourcesLeft) + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUsed = Array((2100L, 2100L), (4000L, 4000L)) + + arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true + arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true + } + + test("Port reservation is done correctly with all random ports - multiple ranges") { + val conf = new SparkConf() + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + portsToUse.isEmpty shouldBe true + } +}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala new file mode 100644 index 0000000..5a81bb3 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.nio.ByteBuffer + +import org.apache.spark.SparkFunSuite + +class MesosTaskLaunchDataSuite extends SparkFunSuite { + test("serialize and deserialize data must be same") { + val serializedTask = ByteBuffer.allocate(40) + (Range(100, 110).map(serializedTask.putInt(_))) + serializedTask.rewind + val attemptNumber = 100 + val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString + serializedTask.rewind + val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString) + assert(mesosTaskLaunchData.attemptNumber == attemptNumber) + assert(mesosTaskLaunchData.serializedTask.equals(serializedTask)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala new file mode 100644 index 0000000..7ebb294 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.util.Collections + +import scala.collection.JavaConverters._ + +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} +import org.apache.mesos.SchedulerDriver +import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Mockito._ + +object Utils { + def createOffer( + offerId: String, + slaveId: String, + mem: Int, + cpus: Int, + ports: Option[(Long, Long)] = None, + gpus: Int = 0): Offer = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpus)) + ports.foreach { resourcePorts => + builder.addResourcesBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() + .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build())) + } + if (gpus > 0) { + builder.addResourcesBuilder() + .setName("gpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(gpus)) + } + builder.setId(createOfferId(offerId)) + .setFrameworkId(FrameworkID.newBuilder() + .setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) + .setHostname(s"host${slaveId}") + .build() + } + + def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = { + val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]]) + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(createOfferId(offerId))), + captor.capture()) + captor.getValue.asScala.toList + } + + def createOfferId(offerId: String): OfferID = { + OfferID.newBuilder().setValue(offerId).build() + } + + def createSlaveId(slaveId: String): SlaveID = { + SlaveID.newBuilder().setValue(slaveId).build() + } + + def createExecutorId(executorId: String): ExecutorID = { + ExecutorID.newBuilder().setValue(executorId).build() + } + + def createTaskId(taskId: String): TaskID = { + TaskID.newBuilder().setValue(taskId).build() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/pom.xml ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml new file mode 100644 index 0000000..04b51dc --- /dev/null +++ b/resource-managers/yarn/pom.xml @@ -0,0 +1,215 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>spark-yarn_2.11</artifactId> + <packaging>jar</packaging> + <name>Spark Project YARN</name> + <properties> + <sbt.project.name>yarn</sbt.project.name> + <jersey-1.version>1.9</jersey-1.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-network-yarn_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-tags_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + <!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-plus</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlets</artifactId> + </dependency> + <!-- End of shaded deps. --> + + <!-- + SPARK-10059: Explicitly add JSP dependencies for tests since the MiniYARN cluster needs them. + --> + <dependency> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet.jsp</artifactId> + <version>2.2.0.v201112011158</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.orbit</groupId> + <artifactId>javax.servlet.jsp.jstl</artifactId> + <version>1.2.0.v201105211821</version> + <scope>test</scope> + </dependency> + + <!-- + See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some needed + dependencies, so they need to be added manually for the tests to work. + --> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + <version>6.1.26</version> + <exclusions> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + + <!-- + Jersey 1 dependencies only required for YARN integration testing. Creating a YARN cluster + in the JVM requires starting a Jersey 1-based web application. + --> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + <scope>test</scope> + <version>${jersey-1.version}</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + <scope>test</scope> + <version>${jersey-1.version}</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + <scope>test</scope> + <version>${jersey-1.version}</version> + </dependency> + <dependency> + <groupId>com.sun.jersey.contribs</groupId> + <artifactId>jersey-guice</artifactId> + <scope>test</scope> + <version>${jersey-1.version}</version> + </dependency> + + <!-- + Testing Hive reflection needs hive on the test classpath only. + It doesn't need the spark hive modules, so the -Phive flag is not checked. + --> + <dependency> + <groupId>${hive.group}</groupId> + <artifactId>hive-exec</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${hive.group}</groupId> + <artifactId>hive-metastore</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider new file mode 100644 index 0000000..22ead56 --- /dev/null +++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider @@ -0,0 +1,3 @@ +org.apache.spark.deploy.yarn.security.HDFSCredentialProvider +org.apache.spark.deploy.yarn.security.HBaseCredentialProvider +org.apache.spark.deploy.yarn.security.HiveCredentialProvider http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 0000000..6e8a1eb --- /dev/null +++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.cluster.YarnClusterManager http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala new file mode 100644 index 0000000..0378ef4 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -0,0 +1,791 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.{File, IOException} +import java.lang.reflect.InvocationTargetException +import java.net.{Socket, URI, URL} +import java.util.concurrent.{TimeoutException, TimeUnit} + +import scala.collection.mutable.HashMap +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.{ConverterUtils, Records} + +import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.rpc._ +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.util._ + +/** + * Common application master functionality for Spark on Yarn. + */ +private[spark] class ApplicationMaster( + args: ApplicationMasterArguments, + client: YarnRMClient) + extends Logging { + + // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be + // optimal as more containers are available. Might need to handle this better. + + private val sparkConf = new SparkConf() + private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) + .asInstanceOf[YarnConfiguration] + private val isClusterMode = args.userClass != null + + // Default to twice the number of executors (twice the maximum number of executors if dynamic + // allocation is enabled), with a minimum of 3. + + private val maxNumExecutorFailures = { + val effectiveNumExecutors = + if (Utils.isDynamicAllocationEnabled(sparkConf)) { + sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS) + } else { + sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0) + } + // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need + // avoid the integer overflow here. + val defaultMaxNumExecutorFailures = math.max(3, + if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors)) + + sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures) + } + + @volatile private var exitCode = 0 + @volatile private var unregistered = false + @volatile private var finished = false + @volatile private var finalStatus = getDefaultFinalStatus + @volatile private var finalMsg: String = "" + @volatile private var userClassThread: Thread = _ + + @volatile private var reporterThread: Thread = _ + @volatile private var allocator: YarnAllocator = _ + + // Lock for controlling the allocator (heartbeat) thread. + private val allocatorLock = new Object() + + // Steady state heartbeat interval. We want to be reasonably responsive without causing too many + // requests to RM. + private val heartbeatInterval = { + // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. + val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) + math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL))) + } + + // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are + // being requested. + private val initialAllocationInterval = math.min(heartbeatInterval, + sparkConf.get(INITIAL_HEARTBEAT_INTERVAL)) + + // Next wait interval before allocator poll. + private var nextAllocationInterval = initialAllocationInterval + + private var rpcEnv: RpcEnv = null + private var amEndpoint: RpcEndpointRef = _ + + // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. + private val sparkContextPromise = Promise[SparkContext]() + + private var credentialRenewer: AMCredentialRenewer = _ + + // Load the list of localized files set by the client. This is used when launching executors, + // and is loaded here so that these configs don't pollute the Web UI's environment page in + // cluster mode. + private val localResources = { + logInfo("Preparing Local resources") + val resources = HashMap[String, LocalResource]() + + def setupDistributedCache( + file: String, + rtype: LocalResourceType, + timestamp: String, + size: String, + vis: String): Unit = { + val uri = new URI(file) + val amJarRsrc = Records.newRecord(classOf[LocalResource]) + amJarRsrc.setType(rtype) + amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) + amJarRsrc.setTimestamp(timestamp.toLong) + amJarRsrc.setSize(size.toLong) + + val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName()) + resources(fileName) = amJarRsrc + } + + val distFiles = sparkConf.get(CACHED_FILES) + val fileSizes = sparkConf.get(CACHED_FILES_SIZES) + val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS) + val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES) + val resTypes = sparkConf.get(CACHED_FILES_TYPES) + + for (i <- 0 to distFiles.size - 1) { + val resType = LocalResourceType.valueOf(resTypes(i)) + setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString, + visibilities(i)) + } + + // Distribute the conf archive to executors. + sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path => + val uri = new URI(path) + val fs = FileSystem.get(uri, yarnConf) + val status = fs.getFileStatus(new Path(uri)) + // SPARK-16080: Make sure to use the correct name for the destination when distributing the + // conf archive to executors. + val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(), + Client.LOCALIZED_CONF_DIR) + setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE, + status.getModificationTime().toString, status.getLen.toString, + LocalResourceVisibility.PRIVATE.name()) + } + + // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy). + CACHE_CONFIGS.foreach { e => + sparkConf.remove(e) + sys.props.remove(e.key) + } + + resources.toMap + } + + def getAttemptId(): ApplicationAttemptId = { + client.getAttemptId() + } + + final def run(): Int = { + try { + val appAttemptId = client.getAttemptId() + + var attemptID: Option[String] = None + + if (isClusterMode) { + // Set the web ui port to be ephemeral for yarn so we don't conflict with + // other spark processes running on the same box + System.setProperty("spark.ui.port", "0") + + // Set the master and deploy mode property to match the requested mode. + System.setProperty("spark.master", "yarn") + System.setProperty("spark.submit.deployMode", "cluster") + + // Set this internal configuration if it is running on cluster mode, this + // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. + System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) + + attemptID = Option(appAttemptId.getAttemptId.toString) + } + + new CallerContext( + "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), + Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() + + logInfo("ApplicationAttemptId: " + appAttemptId) + + val fs = FileSystem.get(yarnConf) + + // This shutdown hook should run *after* the SparkContext is shut down. + val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 + ShutdownHookManager.addShutdownHook(priority) { () => + val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) + val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts + + if (!finished) { + // The default state of ApplicationMaster is failed if it is invoked by shut down hook. + // This behavior is different compared to 1.x version. + // If user application is exited ahead of time by calling System.exit(N), here mark + // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call + // System.exit(0) to terminate the application. + finish(finalStatus, + ApplicationMaster.EXIT_EARLY, + "Shutdown hook called before final status was reported.") + } + + if (!unregistered) { + // we only want to unregister if we don't want the RM to retry + if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { + unregister(finalStatus, finalMsg) + cleanupStagingDir(fs) + } + } + } + + // Call this to force generation of secret so it gets populated into the + // Hadoop UGI. This has to happen before the startUserApplication which does a + // doAs in order for the credentials to be passed on to the executor containers. + val securityMgr = new SecurityManager(sparkConf) + + // If the credentials file config is present, we must periodically renew tokens. So create + // a new AMDelegationTokenRenewer + if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { + // If a principal and keytab have been set, use that to create new credentials for executors + // periodically + credentialRenewer = + new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer() + credentialRenewer.scheduleLoginFromKeytab() + } + + if (isClusterMode) { + runDriver(securityMgr) + } else { + runExecutorLauncher(securityMgr) + } + } catch { + case e: Exception => + // catch everything else if not specifically handled + logError("Uncaught exception: ", e) + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, + "Uncaught exception: " + e) + } + exitCode + } + + /** + * Set the default final application status for client mode to UNDEFINED to handle + * if YARN HA restarts the application so that it properly retries. Set the final + * status to SUCCEEDED in cluster mode to handle if the user calls System.exit + * from the application code. + */ + final def getDefaultFinalStatus(): FinalApplicationStatus = { + if (isClusterMode) { + FinalApplicationStatus.FAILED + } else { + FinalApplicationStatus.UNDEFINED + } + } + + /** + * unregister is used to completely unregister the application from the ResourceManager. + * This means the ResourceManager will not retry the application attempt on your behalf if + * a failure occurred. + */ + final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { + synchronized { + if (!unregistered) { + logInfo(s"Unregistering ApplicationMaster with $status" + + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) + unregistered = true + client.unregister(status, Option(diagnostics).getOrElse("")) + } + } + } + + final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = { + synchronized { + if (!finished) { + val inShutdown = ShutdownHookManager.inShutdown() + logInfo(s"Final app status: $status, exitCode: $code" + + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) + exitCode = code + finalStatus = status + finalMsg = msg + finished = true + if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { + logDebug("shutting down reporter thread") + reporterThread.interrupt() + } + if (!inShutdown && Thread.currentThread() != userClassThread && userClassThread != null) { + logDebug("shutting down user thread") + userClassThread.interrupt() + } + if (!inShutdown && credentialRenewer != null) { + credentialRenewer.stop() + credentialRenewer = null + } + } + } + } + + private def sparkContextInitialized(sc: SparkContext) = { + sparkContextPromise.success(sc) + } + + private def registerAM( + _sparkConf: SparkConf, + _rpcEnv: RpcEnv, + driverRef: RpcEndpointRef, + uiAddress: String, + securityMgr: SecurityManager) = { + val appId = client.getAttemptId().getApplicationId().toString() + val attemptId = client.getAttemptId().getAttemptId().toString() + val historyAddress = + _sparkConf.get(HISTORY_SERVER_ADDRESS) + .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) } + .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } + .getOrElse("") + + val driverUrl = RpcEndpointAddress( + _sparkConf.get("spark.driver.host"), + _sparkConf.get("spark.driver.port").toInt, + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString + + // Before we initialize the allocator, let's log the information about how executors will + // be run up front, to avoid printing this out for every single executor being launched. + // Use placeholders for information that changes such as executor IDs. + logInfo { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt + val executorCores = sparkConf.get(EXECUTOR_CORES) + val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>", + "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources) + dummyRunner.launchContextDebugInfo() + } + + allocator = client.register(driverUrl, + driverRef, + yarnConf, + _sparkConf, + uiAddress, + historyAddress, + securityMgr, + localResources) + + allocator.allocateResources() + reporterThread = launchReporterThread() + } + + /** + * Create an [[RpcEndpoint]] that communicates with the driver. + * + * In cluster mode, the AM and the driver belong to same process + * so the AMEndpoint need not monitor lifecycle of the driver. + * + * @return A reference to the driver's RPC endpoint. + */ + private def runAMEndpoint( + host: String, + port: String, + isClusterMode: Boolean): RpcEndpointRef = { + val driverEndpoint = rpcEnv.setupEndpointRef( + RpcAddress(host, port.toInt), + YarnSchedulerBackend.ENDPOINT_NAME) + amEndpoint = + rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode)) + driverEndpoint + } + + private def runDriver(securityMgr: SecurityManager): Unit = { + addAmIpFilter() + userClassThread = startUserApplication() + + // This a bit hacky, but we need to wait until the spark.driver.port property has + // been set by the Thread executing the user class. + logInfo("Waiting for spark context initialization...") + val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) + try { + val sc = ThreadUtils.awaitResult(sparkContextPromise.future, + Duration(totalWaitTime, TimeUnit.MILLISECONDS)) + if (sc != null) { + rpcEnv = sc.env.rpcEnv + val driverRef = runAMEndpoint( + sc.getConf.get("spark.driver.host"), + sc.getConf.get("spark.driver.port"), + isClusterMode = true) + registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl).getOrElse(""), + securityMgr) + } else { + // Sanity check; should never happen in normal operation, since sc should only be null + // if the user app did not create a SparkContext. + if (!finished) { + throw new IllegalStateException("SparkContext is null but app is still running!") + } + } + userClassThread.join() + } catch { + case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => + logError( + s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + + "Please check earlier log output for errors. Failing the application.") + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_SC_NOT_INITED, + "Timed out waiting for SparkContext.") + } + } + + private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { + val port = sparkConf.get(AM_PORT) + rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr, + clientMode = true) + val driverRef = waitForSparkDriver() + addAmIpFilter() + registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""), + securityMgr) + + // In client mode the actor will stop the reporter thread. + reporterThread.join() + } + + private def launchReporterThread(): Thread = { + // The number of failures in a row until Reporter thread give up + val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) + + val t = new Thread { + override def run() { + var failureCount = 0 + while (!finished) { + try { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, + s"Max number of executor failures ($maxNumExecutorFailures) reached") + } else { + logDebug("Sending progress") + allocator.allocateResources() + } + failureCount = 0 + } catch { + case i: InterruptedException => + case e: Throwable => + failureCount += 1 + // this exception was introduced in hadoop 2.4 and this code would not compile + // with earlier versions if we refer it directly. + if ("org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException" == + e.getClass().getName()) { + logError("Exception from Reporter thread.", e) + finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, + e.getMessage) + } else if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + + s"$failureCount time(s) from Reporter thread.") + } else { + logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) + } + } + try { + val numPendingAllocate = allocator.getPendingAllocate.size + var sleepStart = 0L + var sleepInterval = 200L // ms + allocatorLock.synchronized { + sleepInterval = + if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) { + val currentAllocationInterval = + math.min(heartbeatInterval, nextAllocationInterval) + nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow + currentAllocationInterval + } else { + nextAllocationInterval = initialAllocationInterval + heartbeatInterval + } + sleepStart = System.currentTimeMillis() + allocatorLock.wait(sleepInterval) + } + val sleepDuration = System.currentTimeMillis() - sleepStart + if (sleepDuration < sleepInterval) { + // log when sleep is interrupted + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Slept for $sleepDuration/$sleepInterval ms.") + // if sleep was less than the minimum interval, sleep for the rest of it + val toSleep = math.max(0, initialAllocationInterval - sleepDuration) + if (toSleep > 0) { + logDebug(s"Going back to sleep for $toSleep ms") + // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up + // by the methods that signal allocatorLock because this is just finishing the min + // sleep interval, which should happen even if this is signalled again. + Thread.sleep(toSleep) + } + } else { + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Slept for $sleepDuration/$sleepInterval.") + } + } catch { + case e: InterruptedException => + } + } + } + } + // setting to daemon status, though this is usually not a good idea. + t.setDaemon(true) + t.setName("Reporter") + t.start() + logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " + + s"initial allocation : $initialAllocationInterval) intervals") + t + } + + /** + * Clean up the staging directory. + */ + private def cleanupStagingDir(fs: FileSystem) { + var stagingDirPath: Path = null + try { + val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + } + + private def waitForSparkDriver(): RpcEndpointRef = { + logInfo("Waiting for Spark driver to be reachable.") + var driverUp = false + val hostport = args.userArgs(0) + val (driverHost, driverPort) = Utils.parseHostPort(hostport) + + // Spark driver should already be up since it launched us, but we don't want to + // wait forever, so wait 100 seconds max to match the cluster mode setting. + val totalWaitTimeMs = sparkConf.get(AM_MAX_WAIT_TIME) + val deadline = System.currentTimeMillis + totalWaitTimeMs + + while (!driverUp && !finished && System.currentTimeMillis < deadline) { + try { + val socket = new Socket(driverHost, driverPort) + socket.close() + logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) + driverUp = true + } catch { + case e: Exception => + logError("Failed to connect to driver at %s:%s, retrying ...". + format(driverHost, driverPort)) + Thread.sleep(100L) + } + } + + if (!driverUp) { + throw new SparkException("Failed to connect to driver!") + } + + sparkConf.set("spark.driver.host", driverHost) + sparkConf.set("spark.driver.port", driverPort.toString) + + runAMEndpoint(driverHost, driverPort.toString, isClusterMode = false) + } + + /** Add the Yarn IP filter that is required for properly securing the UI. */ + private def addAmIpFilter() = { + val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + val params = client.getAmIpFilterParams(yarnConf, proxyBase) + if (isClusterMode) { + System.setProperty("spark.ui.filters", amFilter) + params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } + } else { + amEndpoint.send(AddWebUIFilter(amFilter, params.toMap, proxyBase)) + } + } + + /** + * Start the user class, which contains the spark driver, in a separate Thread. + * If the main routine exits cleanly or exits with System.exit(N) for any N + * we assume it was successful, for all other cases we assume failure. + * + * Returns the user thread that was started. + */ + private def startUserApplication(): Thread = { + logInfo("Starting the user application in a separate Thread") + + val classpath = Client.getUserClasspath(sparkConf) + val urls = classpath.map { entry => + new URL("file:" + new File(entry.getPath()).getAbsolutePath()) + } + val userClassLoader = + if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { + new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } else { + new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } + + var userArgs = args.userArgs + if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { + // When running pyspark, the app is run using PythonRunner. The second argument is the list + // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. + userArgs = Seq(args.primaryPyFile, "") ++ userArgs + } + if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { + // TODO(davies): add R dependencies here + } + val mainMethod = userClassLoader.loadClass(args.userClass) + .getMethod("main", classOf[Array[String]]) + + val userThread = new Thread { + override def run() { + try { + mainMethod.invoke(null, userArgs.toArray) + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + logDebug("Done running users class") + } catch { + case e: InvocationTargetException => + e.getCause match { + case _: InterruptedException => + // Reporter thread can interrupt to stop user class + case SparkUserAppException(exitCode) => + val msg = s"User application exited with status $exitCode" + logError(msg) + finish(FinalApplicationStatus.FAILED, exitCode, msg) + case cause: Throwable => + logError("User class threw exception: " + cause, cause) + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, + "User class threw exception: " + cause) + } + sparkContextPromise.tryFailure(e.getCause()) + } finally { + // Notify the thread waiting for the SparkContext, in case the application did not + // instantiate one. This will do nothing when the user code instantiates a SparkContext + // (with the correct master), or when the user code throws an exception (due to the + // tryFailure above). + sparkContextPromise.trySuccess(null) + } + } + } + userThread.setContextClassLoader(userClassLoader) + userThread.setName("Driver") + userThread.start() + userThread + } + + private def resetAllocatorInterval(): Unit = allocatorLock.synchronized { + nextAllocationInterval = initialAllocationInterval + allocatorLock.notifyAll() + } + + /** + * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. + */ + private class AMEndpoint( + override val rpcEnv: RpcEnv, driver: RpcEndpointRef, isClusterMode: Boolean) + extends RpcEndpoint with Logging { + + override def onStart(): Unit = { + driver.send(RegisterClusterManager(self)) + } + + override def receive: PartialFunction[Any, Unit] = { + case x: AddWebUIFilter => + logInfo(s"Add WebUI Filter. $x") + driver.send(x) + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) => + Option(allocator) match { + case Some(a) => + if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal, + localityAwareTasks, hostToLocalTaskCount)) { + resetAllocatorInterval() + } + context.reply(true) + + case None => + logWarning("Container allocator is not ready to request executors yet.") + context.reply(false) + } + + case KillExecutors(executorIds) => + logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") + Option(allocator) match { + case Some(a) => executorIds.foreach(a.killExecutor) + case None => logWarning("Container allocator is not ready to kill executors yet.") + } + context.reply(true) + + case GetExecutorLossReason(eid) => + Option(allocator) match { + case Some(a) => + a.enqueueGetLossReasonRequest(eid, context) + resetAllocatorInterval() + case None => + logWarning("Container allocator is not ready to find executor loss reasons yet.") + } + } + + override def onDisconnected(remoteAddress: RpcAddress): Unit = { + // In cluster mode, do not rely on the disassociated event to exit + // This avoids potentially reporting incorrect exit codes if the driver fails + if (!isClusterMode) { + logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") + finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) + } + } + } + +} + +object ApplicationMaster extends Logging { + + // exit codes for different causes, no reason behind the values + private val EXIT_SUCCESS = 0 + private val EXIT_UNCAUGHT_EXCEPTION = 10 + private val EXIT_MAX_EXECUTOR_FAILURES = 11 + private val EXIT_REPORTER_FAILURE = 12 + private val EXIT_SC_NOT_INITED = 13 + private val EXIT_SECURITY = 14 + private val EXIT_EXCEPTION_USER_CLASS = 15 + private val EXIT_EARLY = 16 + + private var master: ApplicationMaster = _ + + def main(args: Array[String]): Unit = { + SignalUtils.registerLogger(log) + val amArgs = new ApplicationMasterArguments(args) + + // Load the properties file with the Spark configuration and set entries as system properties, + // so that user code run inside the AM also has access to them. + // Note: we must do this before SparkHadoopUtil instantiated + if (amArgs.propertiesFile != null) { + Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => + sys.props(k) = v + } + } + SparkHadoopUtil.get.runAsSparkUser { () => + master = new ApplicationMaster(amArgs, new YarnRMClient) + System.exit(master.run()) + } + } + + private[spark] def sparkContextInitialized(sc: SparkContext): Unit = { + master.sparkContextInitialized(sc) + } + + private[spark] def getAttemptId(): ApplicationAttemptId = { + master.getAttemptId + } + +} + +/** + * This object does not provide any special functionality. It exists so that it's easy to tell + * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. + */ +object ExecutorLauncher { + + def main(args: Array[String]): Unit = { + ApplicationMaster.main(args) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala new file mode 100644 index 0000000..5cdec87 --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.util.{IntParam, MemoryParam} + +class ApplicationMasterArguments(val args: Array[String]) { + var userJar: String = null + var userClass: String = null + var primaryPyFile: String = null + var primaryRFile: String = null + var userArgs: Seq[String] = Nil + var propertiesFile: String = null + + parseArgs(args.toList) + + private def parseArgs(inputArgs: List[String]): Unit = { + val userArgsBuffer = new ArrayBuffer[String]() + + var args = inputArgs + + while (!args.isEmpty) { + // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0, + // the properties with executor in their names are preferred. + args match { + case ("--jar") :: value :: tail => + userJar = value + args = tail + + case ("--class") :: value :: tail => + userClass = value + args = tail + + case ("--primary-py-file") :: value :: tail => + primaryPyFile = value + args = tail + + case ("--primary-r-file") :: value :: tail => + primaryRFile = value + args = tail + + case ("--arg") :: value :: tail => + userArgsBuffer += value + args = tail + + case ("--properties-file") :: value :: tail => + propertiesFile = value + args = tail + + case _ => + printUsageAndExit(1, args) + } + } + + if (primaryPyFile != null && primaryRFile != null) { + // scalastyle:off println + System.err.println("Cannot have primary-py-file and primary-r-file at the same time") + // scalastyle:on println + System.exit(-1) + } + + userArgs = userArgsBuffer.toList + } + + def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { + // scalastyle:off println + if (unknownParam != null) { + System.err.println("Unknown/unsupported param " + unknownParam) + } + System.err.println(""" + |Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] + |Options: + | --jar JAR_PATH Path to your application's JAR file + | --class CLASS_NAME Name of your application's main class + | --primary-py-file A main Python file + | --primary-r-file A main R file + | --arg ARG Argument to be passed to your application's main class. + | Multiple invocations are possible, each will be passed in order. + | --properties-file FILE Path to a custom Spark properties file. + """.stripMargin) + // scalastyle:on println + System.exit(exitCode) + } +} + +object ApplicationMasterArguments { + val DEFAULT_NUMBER_EXECUTORS = 2 +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
