Repository: spark
Updated Branches:
  refs/heads/master 383c5555c -> 912563aa3


http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
new file mode 100644
index 0000000..2cc5abb
--- /dev/null
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{File, IOException}
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.scalatest.{FunSuite, Matchers}
+
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
+
+class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
+
+  val hasBash =
+    try {
+      val exitCode = Runtime.getRuntime().exec(Array("bash", 
"--version")).waitFor()
+      exitCode == 0
+    } catch {
+      case e: IOException =>
+        false
+    }
+
+  if (!hasBash) {
+    logWarning("Cannot execute bash, skipping bash tests.")
+  }
+
+  def bashTest(name: String)(fn: => Unit) =
+    if (hasBash) test(name)(fn) else ignore(name)(fn)
+
+  bashTest("shell script escaping") {
+    val scriptFile = File.createTempFile("script.", ".sh")
+    val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", 
"\\arg6")
+    try {
+      val argLine = args.map(a => 
YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ")
+      Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile)
+      scriptFile.setExecutable(true)
+
+      val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))
+      val out = new 
String(ByteStreams.toByteArray(proc.getInputStream())).trim()
+      val err = new String(ByteStreams.toByteArray(proc.getErrorStream()))
+      val exitCode = proc.waitFor()
+      exitCode should be (0)
+      out should be (args.mkString(" "))
+    } finally {
+      scriptFile.delete()
+    }
+  }
+
+  test("Yarn configuration override") {
+    val key = "yarn.nodemanager.hostname"
+    val default = new YarnConfiguration()
+
+    val sparkConf = new SparkConf()
+      .set("spark.hadoop." + key, "someHostName")
+    val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
+
+    yarnConf.getClass() should be (classOf[YarnConfiguration])
+    yarnConf.get(key) should not be default.get(key)
+  }
+
+
+  test("test getApplicationAclsForYarn acls on") {
+
+    // spark acls on, just pick up default user
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.acls.enable", "true")
+
+    val securityMgr = new SecurityManager(sparkConf)
+    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+    viewAcls match {
+      case Some(vacls) => {
+        val aclSet = vacls.split(',').map(_.trim).toSet
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+    modifyAcls match {
+      case Some(macls) => {
+        val aclSet = macls.split(',').map(_.trim).toSet
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+  }
+
+  test("test getApplicationAclsForYarn acls on and specify users") {
+
+    // default spark acls are on and specify acls
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.acls.enable", "true")
+    sparkConf.set("spark.ui.view.acls", "user1,user2")
+    sparkConf.set("spark.modify.acls", "user3,user4")
+
+    val securityMgr = new SecurityManager(sparkConf)
+    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+    viewAcls match {
+      case Some(vacls) => {
+        val aclSet = vacls.split(',').map(_.trim).toSet
+        assert(aclSet.contains("user1"))
+        assert(aclSet.contains("user2"))
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+    modifyAcls match {
+      case Some(macls) => {
+        val aclSet = macls.split(',').map(_.trim).toSet
+        assert(aclSet.contains("user3"))
+        assert(aclSet.contains("user4"))
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      }
+      case None => {
+        fail()
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
deleted file mode 100644
index 8b6521a..0000000
--- a/yarn/stable/pom.xml
+++ /dev/null
@@ -1,95 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>yarn-parent_2.10</artifactId>
-    <version>1.3.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-  <properties>
-    <sbt.project.name>yarn-stable</sbt.project.name>
-  </properties>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-yarn_2.10</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project YARN Stable API</name>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-tests</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <!--
-    See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some 
needed
-    dependencies, so they need to be added manually for the tests to work.
-  -->
-  <profiles>
-    <profile>
-      <id>hadoop-2.2</id>
-      <properties>
-        <jersey.version>1.9</jersey.version>
-      </properties>
-      <dependencies>
-        <dependency>
-          <groupId>org.mortbay.jetty</groupId>
-          <artifactId>jetty</artifactId>
-          <version>6.1.26</version>
-          <exclusions>
-            <exclusion>
-              <groupId>org.mortbay.jetty</groupId>
-              <artifactId>servlet-api</artifactId>
-            </exclusion>
-          </exclusions>
-          <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>com.sun.jersey</groupId>
-          <artifactId>jersey-core</artifactId>
-          <version>${jersey.version}</version>
-          <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>com.sun.jersey</groupId>
-          <artifactId>jersey-json</artifactId>
-          <version>${jersey.version}</version>
-          <scope>test</scope>
-          <exclusions>
-            <exclusion>
-              <groupId>stax</groupId>
-              <artifactId>stax-api</artifactId>
-            </exclusion>
-          </exclusions>
-        </dependency>
-        <dependency>
-          <groupId>com.sun.jersey</groupId>
-          <artifactId>jersey-server</artifactId>
-          <version>${jersey.version}</version>
-          <scope>test</scope>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
deleted file mode 100644
index addaddb..0000000
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.Records
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
-
-/**
- * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's 
stable API.
- */
-private[spark] class Client(
-    val args: ClientArguments,
-    val hadoopConf: Configuration,
-    val sparkConf: SparkConf)
-  extends ClientBase with Logging {
-
-  def this(clientArgs: ClientArguments, spConf: SparkConf) =
-    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
-
-  def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
-
-  val yarnClient = YarnClient.createYarnClient
-  val yarnConf = new YarnConfiguration(hadoopConf)
-
-  def stop(): Unit = yarnClient.stop()
-
-  /* 
-------------------------------------------------------------------------------------
 *
-   | The following methods have much in common in the stable and alpha 
versions of Client, |
-   | but cannot be implemented in the parent trait due to subtle API 
differences across    |
-   | hadoop versions.                                                          
            |
-   * 
-------------------------------------------------------------------------------------
 */
-
-  /**
-   * Submit an application running our ApplicationMaster to the 
ResourceManager.
-   *
-   * The stable Yarn API provides a convenience method 
(YarnClient#createApplication) for
-   * creating applications and setting up the application submission context. 
This was not
-   * available in the alpha API.
-   */
-  override def submitApplication(): ApplicationId = {
-    yarnClient.init(yarnConf)
-    yarnClient.start()
-
-    logInfo("Requesting a new application from cluster with %d NodeManagers"
-      .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
-
-    // Get a new application from our RM
-    val newApp = yarnClient.createApplication()
-    val newAppResponse = newApp.getNewApplicationResponse()
-    val appId = newAppResponse.getApplicationId()
-
-    // Verify whether the cluster has enough resources for our AM
-    verifyClusterResources(newAppResponse)
-
-    // Set up the appropriate contexts to launch our AM
-    val containerContext = createContainerLaunchContext(newAppResponse)
-    val appContext = createApplicationSubmissionContext(newApp, 
containerContext)
-
-    // Finally, submit and monitor the application
-    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
-    yarnClient.submitApplication(appContext)
-    appId
-  }
-
-  /**
-   * Set up the context for submitting our ApplicationMaster.
-   * This uses the YarnClientApplication not available in the Yarn alpha API.
-   */
-  def createApplicationSubmissionContext(
-      newApp: YarnClientApplication,
-      containerContext: ContainerLaunchContext): ApplicationSubmissionContext 
= {
-    val appContext = newApp.getApplicationSubmissionContext
-    appContext.setApplicationName(args.appName)
-    appContext.setQueue(args.amQueue)
-    appContext.setAMContainerSpec(containerContext)
-    appContext.setApplicationType("SPARK")
-    val capability = Records.newRecord(classOf[Resource])
-    capability.setMemory(args.amMemory + amMemoryOverhead)
-    appContext.setResource(capability)
-    appContext
-  }
-
-  /** Set up security tokens for launching our ApplicationMaster container. */
-  override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = 
{
-    val dob = new DataOutputBuffer
-    credentials.writeTokenStorageToStream(dob)
-    amContainer.setTokens(ByteBuffer.wrap(dob.getData))
-  }
-
-  /** Get the application report from the ResourceManager for an application 
we have submitted. */
-  override def getApplicationReport(appId: ApplicationId): ApplicationReport =
-    yarnClient.getApplicationReport(appId)
-
-  /**
-   * Return the security token used by this client to communicate with the 
ApplicationMaster.
-   * If no security is enabled, the token returned by the report is null.
-   */
-  override def getClientToken(report: ApplicationReport): String =
-    Option(report.getClientToAMToken).map(_.toString).getOrElse("")
-}
-
-object Client {
-  def main(argStrings: Array[String]) {
-    if (!sys.props.contains("SPARK_SUBMIT")) {
-      println("WARNING: This client is deprecated and will be removed in a " +
-        "future version of Spark. Use ./bin/spark-submit with \"--master 
yarn\"")
-    }
-
-    // Set an env variable indicating we are running in YARN mode.
-    // Note that any env variable with the SPARK_ prefix gets propagated to 
all (remote) processes
-    System.setProperty("SPARK_YARN_MODE", "true")
-    val sparkConf = new SparkConf
-
-    val args = new ClientArguments(argStrings, sparkConf)
-    new Client(args, sparkConf).run()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
deleted file mode 100644
index fdd3c23..0000000
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.client.api.NMClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-
-import org.apache.spark.{SecurityManager, SparkConf, Logging}
-import org.apache.spark.network.util.JavaUtils
-
-
-class ExecutorRunnable(
-    container: Container,
-    conf: Configuration,
-    spConf: SparkConf,
-    masterAddress: String,
-    slaveId: String,
-    hostname: String,
-    executorMemory: Int,
-    executorCores: Int,
-    appId: String,
-    securityMgr: SecurityManager)
-  extends Runnable with ExecutorRunnableUtil with Logging {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  var nmClient: NMClient = _
-  val sparkConf = spConf
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
-  def run = {
-    logInfo("Starting Executor Container")
-    nmClient = NMClient.createNMClient()
-    nmClient.init(yarnConf)
-    nmClient.start()
-    startContainer
-  }
-
-  def startContainer = {
-    logInfo("Setting up ContainerLaunchContext")
-
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-      .asInstanceOf[ContainerLaunchContext]
-
-    val localResources = prepareLocalResources
-    ctx.setLocalResources(localResources)
-
-    ctx.setEnvironment(env)
-
-    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val dob = new DataOutputBuffer()
-    credentials.writeTokenStorageToStream(dob)
-    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
-
-    val commands = prepareCommand(masterAddress, slaveId, hostname, 
executorMemory, executorCores,
-      appId, localResources)
-
-    logInfo(s"Setting up executor with environment: $env")
-    logInfo("Setting up executor with commands: " + commands)
-    ctx.setCommands(commands)
-
-    
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
-
-    // If external shuffle service is enabled, register with the Yarn shuffle 
service already
-    // started on the NodeManager and, if authentication is enabled, provide 
it with our secret
-    // key for fetching shuffle files later
-    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
-      val secretString = securityMgr.getSecretKey()
-      val secretBytes =
-        if (secretString != null) {
-          // This conversion must match how the YarnShuffleService decodes our 
secret
-          JavaUtils.stringToBytes(secretString)
-        } else {
-          // Authentication is not enabled, so just provide dummy metadata
-          ByteBuffer.allocate(0)
-        }
-      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> 
secretBytes))
-    }
-
-    // Send the start request to the ContainerManager
-    nmClient.startContainer(container, ctx)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index 2bbf5d7..0000000
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
-import org.apache.spark.{SecurityManager, SparkConf} 
-import org.apache.spark.scheduler.SplitInfo
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
-
-/**
- * Acquires resources for executors from a ResourceManager and launches 
executors in new containers.
- */
-private[yarn] class YarnAllocationHandler(
-    conf: Configuration,
-    sparkConf: SparkConf,
-    amClient: AMRMClient[ContainerRequest],
-    appAttemptId: ApplicationAttemptId,
-    args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]], 
-    securityMgr: SecurityManager)
-  extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, 
securityMgr) {
-
-  override protected def releaseContainer(container: Container) = {
-    amClient.releaseAssignedContainer(container.getId())
-  }
-
-  // pending isn't used on stable as the AMRMClient handles incremental asks
-  override protected def allocateContainers(count: Int, pending: Int): 
YarnAllocateResponse = {
-    addResourceRequests(count)
-
-    // We have already set the container request. Poll the ResourceManager for 
a response.
-    // This doubles as a heartbeat if there are no pending container requests.
-    val progressIndicator = 0.1f
-    new StableAllocateResponse(amClient.allocate(progressIndicator))
-  }
-
-  private def createRackResourceRequests(
-      hostContainers: ArrayBuffer[ContainerRequest]
-    ): ArrayBuffer[ContainerRequest] = {
-    // Generate modified racks and new set of hosts under it before issuing 
requests.
-    val rackToCounts = new HashMap[String, Int]()
-
-    for (container <- hostContainers) {
-      val candidateHost = container.getNodes.last
-      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-      if (rack != null) {
-        var count = rackToCounts.getOrElse(rack, 0)
-        count += 1
-        rackToCounts.put(rack, count)
-      }
-    }
-
-    val requestedContainers = new 
ArrayBuffer[ContainerRequest](rackToCounts.size)
-    for ((rack, count) <- rackToCounts) {
-      requestedContainers ++= createResourceRequests(
-        AllocationType.RACK,
-        rack,
-        count,
-        YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-    }
-
-    requestedContainers
-  }
-
-  private def addResourceRequests(numExecutors: Int) {
-    val containerRequests: List[ContainerRequest] =
-      if (numExecutors <= 0) {
-        logDebug("numExecutors: " + numExecutors)
-        List()
-      } else if (preferredHostToCount.isEmpty) {
-        logDebug("host preferences is empty")
-        createResourceRequests(
-          AllocationType.ANY,
-          resource = null,
-          numExecutors,
-          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList
-      } else {
-        // Request for all hosts in preferred nodes and for numExecutors -
-        // candidates.size, request by default allocation policy.
-        val hostContainerRequests = new 
ArrayBuffer[ContainerRequest](preferredHostToCount.size)
-        for ((candidateHost, candidateCount) <- preferredHostToCount) {
-          val requiredCount = candidateCount - 
allocatedContainersOnHost(candidateHost)
-
-          if (requiredCount > 0) {
-            hostContainerRequests ++= createResourceRequests(
-              AllocationType.HOST,
-              candidateHost,
-              requiredCount,
-              YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-          }
-        }
-        val rackContainerRequests: List[ContainerRequest] = 
createRackResourceRequests(
-          hostContainerRequests).toList
-
-        val anyContainerRequests = createResourceRequests(
-          AllocationType.ANY,
-          resource = null,
-          numExecutors,
-          YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-
-        val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
-          hostContainerRequests.size + rackContainerRequests.size() + 
anyContainerRequests.size)
-
-        containerRequestBuffer ++= hostContainerRequests
-        containerRequestBuffer ++= rackContainerRequests
-        containerRequestBuffer ++= anyContainerRequests
-        containerRequestBuffer.toList
-      }
-
-    for (request <- containerRequests) {
-      amClient.addContainerRequest(request)
-    }
-
-    for (request <- containerRequests) {
-      val nodes = request.getNodes
-      var hostStr = if (nodes == null || nodes.isEmpty) {
-        "Any"
-      } else {
-        nodes.last
-      }
-      logInfo("Container request (host: %s, priority: %s, capability: 
%s".format(
-        hostStr,
-        request.getPriority().getPriority,
-        request.getCapability))
-    }
-  }
-
-  private def createResourceRequests(
-      requestType: AllocationType.AllocationType,
-      resource: String,
-      numExecutors: Int,
-      priority: Int
-    ): ArrayBuffer[ContainerRequest] = {
-
-    // If hostname is specified, then we need at least two requests - node 
local and rack local.
-    // There must be a third request, which is ANY. That will be specially 
handled.
-    requestType match {
-      case AllocationType.HOST => {
-        assert(YarnSparkHadoopUtil.ANY_HOST != resource)
-        val hostname = resource
-        val nodeLocal = constructContainerRequests(
-          Array(hostname),
-          racks = null,
-          numExecutors,
-          priority)
-
-        // Add `hostname` to the global (singleton) host->rack mapping in 
YarnAllocationHandler.
-        YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
-        nodeLocal
-      }
-      case AllocationType.RACK => {
-        val rack = resource
-        constructContainerRequests(hosts = null, Array(rack), numExecutors, 
priority)
-      }
-      case AllocationType.ANY => constructContainerRequests(
-        hosts = null, racks = null, numExecutors, priority)
-      case _ => throw new IllegalArgumentException(
-        "Unexpected/unsupported request type: " + requestType)
-    }
-  }
-
-  private def constructContainerRequests(
-      hosts: Array[String],
-      racks: Array[String],
-      numExecutors: Int,
-      priority: Int
-    ): ArrayBuffer[ContainerRequest] = {
-
-    val memoryRequest = executorMemory + memoryOverhead
-    val resource = Resource.newInstance(memoryRequest, executorCores)
-
-    val prioritySetting = Records.newRecord(classOf[Priority])
-    prioritySetting.setPriority(priority)
-
-    val requests = new ArrayBuffer[ContainerRequest]()
-    for (i <- 0 until numExecutors) {
-      requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
-    }
-    requests
-  }
-
-  private class StableAllocateResponse(response: AllocateResponse) extends 
YarnAllocateResponse {
-    override def getAllocatedContainers() = response.getAllocatedContainers()
-    override def getAvailableResources() = response.getAvailableResources()
-    override def getCompletedContainersStatuses() = 
response.getCompletedContainersStatuses()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
deleted file mode 100644
index 8d4b96e..0000000
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.{List => JList}
-
-import scala.collection.{Map, Set}
-import scala.collection.JavaConversions._
-import scala.util._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.util.Utils
-
-
-/**
- * YarnRMClient implementation for the Yarn stable API.
- */
-private class YarnRMClientImpl(args: ApplicationMasterArguments) extends 
YarnRMClient with Logging {
-
-  private var amClient: AMRMClient[ContainerRequest] = _
-  private var uiHistoryAddress: String = _
-  private var registered: Boolean = false
-
-  override def register(
-      conf: YarnConfiguration,
-      sparkConf: SparkConf,
-      preferredNodeLocations: Map[String, Set[SplitInfo]],
-      uiAddress: String,
-      uiHistoryAddress: String,
-      securityMgr: SecurityManager) = {
-    amClient = AMRMClient.createAMRMClient()
-    amClient.init(conf)
-    amClient.start()
-    this.uiHistoryAddress = uiHistoryAddress
-
-    logInfo("Registering the ApplicationMaster")
-    synchronized {
-      amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
-      registered = true
-    }
-    new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
-      preferredNodeLocations, securityMgr)
-  }
-
-  override def unregister(status: FinalApplicationStatus, diagnostics: String 
= "") = synchronized {
-    if (registered) {
-      amClient.unregisterApplicationMaster(status, diagnostics, 
uiHistoryAddress)
-    }
-  }
-
-  override def getAttemptId() = {
-    val containerIdString = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
-    val containerId = ConverterUtils.toContainerId(containerIdString)
-    val appAttemptId = containerId.getApplicationAttemptId()
-    appAttemptId
-  }
-
-  override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) 
= {
-    // Figure out which scheme Yarn is using. Note the method seems to have 
been added after 2.2,
-    // so not all stable releases have it.
-    val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", 
classOf[Configuration])
-        .invoke(null, conf).asInstanceOf[String]).getOrElse("http://";)
-
-    // If running a new enough Yarn, use the HA-aware API for retrieving the 
RM addresses.
-    try {
-      val method = 
classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
-        classOf[Configuration])
-      val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
-      val hosts = proxies.map { proxy => proxy.split(":")(0) }
-      val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
-      Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> 
uriBases.mkString(","))
-    } catch {
-      case e: NoSuchMethodException =>
-        val proxy = WebAppUtils.getProxyHostAndPort(conf)
-        val parts = proxy.split(":")
-        val uriBase = prefix + proxy + proxyBase
-        Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
-    }
-  }
-
-  override def getMaxRegAttempts(conf: YarnConfiguration) =
-    conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/yarn/stable/src/test/resources/log4j.properties 
b/yarn/stable/src/test/resources/log4j.properties
deleted file mode 100644
index 9dd05f1..0000000
--- a/yarn/stable/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file core/target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=false
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.eclipse.jetty=WARN
-org.eclipse.jetty.LEVEL=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/912563aa/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
deleted file mode 100644
index d79b85e..0000000
--- 
a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConversions._
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.server.MiniYARNCluster
-
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
-
-class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers 
with Logging {
-
-  // log4j configuration for the Yarn containers, so that their output is 
collected
-  // by Yarn instead of trying to overwrite unit-tests.log.
-  private val LOG4J_CONF = """
-    |log4j.rootCategory=DEBUG, console
-    |log4j.appender.console=org.apache.log4j.ConsoleAppender
-    |log4j.appender.console.target=System.err
-    |log4j.appender.console.layout=org.apache.log4j.PatternLayout
-    |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
-    """.stripMargin
-
-  private var yarnCluster: MiniYARNCluster = _
-  private var tempDir: File = _
-  private var fakeSparkJar: File = _
-  private var oldConf: Map[String, String] = _
-
-  override def beforeAll() {
-    tempDir = Utils.createTempDir()
-
-    val logConfDir = new File(tempDir, "log4j")
-    logConfDir.mkdir()
-
-    val logConfFile = new File(logConfDir, "log4j.properties")
-    Files.write(LOG4J_CONF, logConfFile, Charsets.UTF_8)
-
-    val childClasspath = logConfDir.getAbsolutePath() + File.pathSeparator +
-      sys.props("java.class.path")
-
-    oldConf = sys.props.filter { case (k, v) => k.startsWith("spark.") }.toMap
-
-    yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
-    yarnCluster.init(new YarnConfiguration())
-    yarnCluster.start()
-
-    // There's a race in MiniYARNCluster in which start() may return before 
the RM has updated
-    // its address in the configuration. You can see this in the logs by 
noticing that when
-    // MiniYARNCluster prints the address, it still has port "0" assigned, 
although later the
-    // test works sometimes:
-    //
-    //    INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
-    //
-    // That log message prints the contents of the RM_ADDRESS config variable. 
If you check it
-    // later on, it looks something like this:
-    //
-    //    INFO YarnClusterSuite: RM address in configuration is blah:42631
-    //
-    // This hack loops for a bit waiting for the port to change, and fails the 
test if it hasn't
-    // done so in a timely manner (defined to be 10 seconds).
-    val config = yarnCluster.getConfig()
-    val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
-    while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
-      if (System.currentTimeMillis() > deadline) {
-        throw new IllegalStateException("Timed out waiting for RM to come up.")
-      }
-      logDebug("RM address still not set in configuration, waiting...")
-      TimeUnit.MILLISECONDS.sleep(100)
-    }
-
-    logInfo(s"RM address in configuration is 
${config.get(YarnConfiguration.RM_ADDRESS)}")
-    config.foreach { e =>
-      sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
-    }
-
-    fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
-    sys.props += ("spark.yarn.jar" -> ("local:" + 
fakeSparkJar.getAbsolutePath()))
-    sys.props += ("spark.executor.instances" -> "1")
-    sys.props += ("spark.driver.extraClassPath" -> childClasspath)
-    sys.props += ("spark.executor.extraClassPath" -> childClasspath)
-
-    super.beforeAll()
-  }
-
-  override def afterAll() {
-    yarnCluster.stop()
-    sys.props.retain { case (k, v) => !k.startsWith("spark.") }
-    sys.props ++= oldConf
-    super.afterAll()
-  }
-
-  test("run Spark in yarn-client mode") {
-    var result = File.createTempFile("result", null, tempDir)
-    YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
-    checkResult(result)
-  }
-
-  test("run Spark in yarn-cluster mode") {
-    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-    var result = File.createTempFile("result", null, tempDir)
-
-    val args = Array("--class", main,
-      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
-      "--arg", "yarn-cluster",
-      "--arg", result.getAbsolutePath(),
-      "--num-executors", "1")
-    Client.main(args)
-    checkResult(result)
-  }
-
-  test("run Spark in yarn-cluster mode unsuccessfully") {
-    val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
-
-    // Use only one argument so the driver will fail
-    val args = Array("--class", main,
-      "--jar", "file:" + fakeSparkJar.getAbsolutePath(),
-      "--arg", "yarn-cluster",
-      "--num-executors", "1")
-    val exception = intercept[SparkException] {
-      Client.main(args)
-    }
-    assert(Utils.exceptionString(exception).contains("Application finished 
with failed status"))
-  }
-
-  /**
-   * This is a workaround for an issue with yarn-cluster mode: the Client 
class will not provide
-   * any sort of error when the job process finishes successfully, but the job 
itself fails. So
-   * the tests enforce that something is written to a file after everything is 
ok to indicate
-   * that the job succeeded.
-   */
-  private def checkResult(result: File) = {
-    var resultString = Files.toString(result, Charsets.UTF_8)
-    resultString should be ("success")
-  }
-
-}
-
-private object YarnClusterDriver extends Logging with Matchers {
-
-  def main(args: Array[String]) = {
-    if (args.length != 2) {
-      System.err.println(
-        s"""
-        |Invalid command line: ${args.mkString(" ")}
-        |
-        |Usage: YarnClusterDriver [master] [result file]
-        """.stripMargin)
-      System.exit(1)
-    }
-
-    val sc = new SparkContext(new SparkConf().setMaster(args(0))
-      .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and 
$dollarSigns"))
-    val status = new File(args(1))
-    var result = "failure"
-    try {
-      val data = sc.parallelize(1 to 4, 4).collect().toSet
-      data should be (Set(1, 2, 3, 4))
-      result = "success"
-    } finally {
-      sc.stop()
-      Files.write(result, status, Charsets.UTF_8)
-    }
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to