http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
new file mode 100644
index 0000000..7fbbe12
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{File, IOException}
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.{ByteStreams, Files}
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.scalatest.Matchers
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ResetSystemProperties, Utils}
+
+class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
+  with ResetSystemProperties {
+
+  val hasBash =
+    try {
+      val exitCode = Runtime.getRuntime().exec(Array("bash", 
"--version")).waitFor()
+      exitCode == 0
+    } catch {
+      case e: IOException =>
+        false
+    }
+
+  if (!hasBash) {
+    logWarning("Cannot execute bash, skipping bash tests.")
+  }
+
+  def bashTest(name: String)(fn: => Unit): Unit =
+    if (hasBash) test(name)(fn) else ignore(name)(fn)
+
+  bashTest("shell script escaping") {
+    val scriptFile = File.createTempFile("script.", ".sh", 
Utils.createTempDir())
+    val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", 
"\\arg6")
+    try {
+      val argLine = args.map(a => 
YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ")
+      Files.write(("bash -c \"echo " + argLine + 
"\"").getBytes(StandardCharsets.UTF_8), scriptFile)
+      scriptFile.setExecutable(true)
+
+      val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))
+      val out = new 
String(ByteStreams.toByteArray(proc.getInputStream())).trim()
+      val err = new String(ByteStreams.toByteArray(proc.getErrorStream()))
+      val exitCode = proc.waitFor()
+      exitCode should be (0)
+      out should be (args.mkString(" "))
+    } finally {
+      scriptFile.delete()
+    }
+  }
+
+  test("Yarn configuration override") {
+    val key = "yarn.nodemanager.hostname"
+    val default = new YarnConfiguration()
+
+    val sparkConf = new SparkConf()
+      .set("spark.hadoop." + key, "someHostName")
+    val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf)
+
+    yarnConf.getClass() should be (classOf[YarnConfiguration])
+    yarnConf.get(key) should not be default.get(key)
+  }
+
+
+  test("test getApplicationAclsForYarn acls on") {
+
+    // spark acls on, just pick up default user
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.acls.enable", "true")
+
+    val securityMgr = new SecurityManager(sparkConf)
+    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+    viewAcls match {
+      case Some(vacls) =>
+        val aclSet = vacls.split(',').map(_.trim).toSet
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      case None =>
+        fail()
+    }
+    modifyAcls match {
+      case Some(macls) =>
+        val aclSet = macls.split(',').map(_.trim).toSet
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      case None =>
+        fail()
+    }
+  }
+
+  test("test getApplicationAclsForYarn acls on and specify users") {
+
+    // default spark acls are on and specify acls
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.acls.enable", "true")
+    sparkConf.set("spark.ui.view.acls", "user1,user2")
+    sparkConf.set("spark.modify.acls", "user3,user4")
+
+    val securityMgr = new SecurityManager(sparkConf)
+    val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+    val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+    val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+    viewAcls match {
+      case Some(vacls) =>
+        val aclSet = vacls.split(',').map(_.trim).toSet
+        assert(aclSet.contains("user1"))
+        assert(aclSet.contains("user2"))
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      case None =>
+        fail()
+    }
+    modifyAcls match {
+      case Some(macls) =>
+        val aclSet = macls.split(',').map(_.trim).toSet
+        assert(aclSet.contains("user3"))
+        assert(aclSet.contains("user4"))
+        assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+      case None =>
+        fail()
+    }
+
+  }
+
+  test("test expandEnvironment result") {
+    val target = Environment.PWD
+    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+      YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + 
"}}")
+    } else if (Utils.isWindows) {
+      YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + 
"%")
+    } else {
+      YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target)
+    }
+
+  }
+
+  test("test getClassPathSeparator result") {
+    if (classOf[ApplicationConstants].getFields().exists(_.getName == 
"CLASS_PATH_SEPARATOR")) {
+      YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>")
+    } else if (Utils.isWindows) {
+      YarnSparkHadoopUtil.getClassPathSeparator() should be (";")
+    } else {
+      YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
+    }
+  }
+
+  test("check different hadoop utils based on env variable") {
+    try {
+      System.setProperty("SPARK_YARN_MODE", "true")
+      assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil])
+      System.setProperty("SPARK_YARN_MODE", "false")
+      assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil])
+    } finally {
+      System.clearProperty("SPARK_YARN_MODE")
+    }
+  }
+
+
+
+  // This test needs to live here because it depends on isYarnMode returning 
true, which can only
+  // happen in the YARN module.
+  test("security manager token generation") {
+    try {
+      System.setProperty("SPARK_YARN_MODE", "true")
+      val initial = SparkHadoopUtil.get
+        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
+      assert(initial === null || initial.length === 0)
+
+      val conf = new SparkConf()
+        .set(SecurityManager.SPARK_AUTH_CONF, "true")
+        .set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
+      val sm = new SecurityManager(conf)
+
+      val generated = SparkHadoopUtil.get
+        .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY)
+      assert(generated != null)
+      val genString = new Text(generated).toString()
+      assert(genString != "unused")
+      assert(sm.getSecretKey() === genString)
+    } finally {
+      // removeSecretKey() was only added in Hadoop 2.6, so instead we just 
set the secret
+      // to an empty string.
+      
SparkHadoopUtil.get.addSecretKeyToUserCredentials(SecurityManager.SECRET_LOOKUP_KEY,
 "")
+      System.clearProperty("SPARK_YARN_MODE")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
new file mode 100644
index 0000000..db4619e
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.Token
+import org.scalatest.{BeforeAndAfter, Matchers}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
+
+class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfter {
+  private var credentialManager: ConfigurableCredentialManager = null
+  private var sparkConf: SparkConf = null
+  private var hadoopConf: Configuration = null
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sparkConf = new SparkConf()
+    hadoopConf = new Configuration()
+    System.setProperty("SPARK_YARN_MODE", "true")
+  }
+
+  override def afterAll(): Unit = {
+    System.clearProperty("SPARK_YARN_MODE")
+
+    super.afterAll()
+  }
+
+  test("Correctly load default credential providers") {
+    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
+
+    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+    credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
+    credentialManager.getServiceCredentialProvider("hive") should not be (None)
+  }
+
+  test("disable hive credential provider") {
+    sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
+    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
+
+    credentialManager.getServiceCredentialProvider("hdfs") should not be (None)
+    credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
+    credentialManager.getServiceCredentialProvider("hive") should be (None)
+  }
+
+  test("using deprecated configurations") {
+    sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false")
+    sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
+    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
+
+    credentialManager.getServiceCredentialProvider("hdfs") should be (None)
+    credentialManager.getServiceCredentialProvider("hive") should be (None)
+    credentialManager.getServiceCredentialProvider("test") should not be (None)
+    credentialManager.getServiceCredentialProvider("hbase") should not be 
(None)
+  }
+
+  test("verify obtaining credentials from provider") {
+    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
+    val creds = new Credentials()
+
+    // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and 
hive tokens cannot
+    // be obtained.
+    credentialManager.obtainCredentials(hadoopConf, creds)
+    val tokens = creds.getAllTokens
+    tokens.size() should be (1)
+    tokens.iterator().next().getService should be (new Text("test"))
+  }
+
+  test("verify getting credential renewal info") {
+    credentialManager = new ConfigurableCredentialManager(sparkConf, 
hadoopConf)
+    val creds = new Credentials()
+
+    val testCredentialProvider = 
credentialManager.getServiceCredentialProvider("test").get
+      .asInstanceOf[TestCredentialProvider]
+    // Only TestTokenProvider can get the time of next token renewal
+    val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds)
+    nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal)
+  }
+
+  test("obtain tokens For HiveMetastore") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hive.metastore.kerberos.principal", "bob")
+    // thrift picks up on port 0 and bails out, without trying to talk to 
endpoint
+    hadoopConf.set("hive.metastore.uris", "http://localhost:0";)
+
+    val hiveCredentialProvider = new HiveCredentialProvider()
+    val credentials = new Credentials()
+    hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, 
credentials)
+
+    credentials.getAllTokens.size() should be (0)
+  }
+
+  test("Obtain tokens For HBase") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hbase.security.authentication", "kerberos")
+
+    val hbaseTokenProvider = new HBaseCredentialProvider()
+    val creds = new Credentials()
+    hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds)
+
+    creds.getAllTokens.size should be (0)
+  }
+}
+
+class TestCredentialProvider extends ServiceCredentialProvider {
+  val tokenRenewalInterval = 86400 * 1000L
+  var timeOfNextTokenRenewal = 0L
+
+  override def serviceName: String = "test"
+
+  override def credentialsRequired(conf: Configuration): Boolean = true
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    if (creds == null) {
+      // Guard out other unit test failures.
+      return None
+    }
+
+    val emptyToken = new Token()
+    emptyToken.setService(new Text("test"))
+    creds.addToken(emptyToken.getService, emptyToken)
+
+    val currTime = System.currentTimeMillis()
+    timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + 
tokenRenewalInterval
+
+    Some(timeOfNextTokenRenewal)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
new file mode 100644
index 0000000..7b2da3f
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.{Matchers, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+
+class HDFSCredentialProviderSuite
+    extends SparkFunSuite
+    with PrivateMethodTester
+    with Matchers {
+  private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
+
+  private def getTokenRenewer(
+      hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): 
String = {
+    hdfsCredentialProvider invokePrivate _getTokenRenewer(conf)
+  }
+
+  private var hdfsCredentialProvider: HDFSCredentialProvider = null
+
+  override def beforeAll() {
+    super.beforeAll()
+
+    if (hdfsCredentialProvider == null) {
+      hdfsCredentialProvider = new HDFSCredentialProvider()
+    }
+  }
+
+  override def afterAll() {
+    if (hdfsCredentialProvider != null) {
+      hdfsCredentialProvider = null
+    }
+
+    super.afterAll()
+  }
+
+  test("check token renewer") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+    hadoopConf.set("yarn.resourcemanager.principal", 
"yarn/myrm:[email protected]")
+    val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+    renewer should be ("yarn/myrm:[email protected]")
+  }
+
+  test("check token renewer default") {
+    val hadoopConf = new Configuration()
+    val caught =
+      intercept[SparkException] {
+        getTokenRenewer(hdfsCredentialProvider, hadoopConf)
+      }
+    assert(caught.getMessage === "Can't get Master Kerberos principal for use 
as renewer")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
new file mode 100644
index 0000000..da9e8e2
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import java.util.{List => JList, Map => JMap}
+
+/**
+ * Exposes AbstractCommandBuilder to the YARN tests, so that they can build 
classpaths the same
+ * way other cluster managers do.
+ */
+private[spark] class TestClasspathBuilder extends AbstractCommandBuilder {
+
+  childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, 
sys.props("spark.test.home"))
+
+  override def buildClassPath(extraCp: String): JList[String] = 
super.buildClassPath(extraCp)
+
+  /** Not used by the YARN tests. */
+  override def buildCommand(env: JMap[String, String]): JList[String] =
+    throw new UnsupportedOperationException()
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
new file mode 100644
index 0000000..1fed256
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.shuffle
+
+import java.io.File
+import java.util.concurrent.ConcurrentMap
+
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.fusesource.leveldbjni.JniDBFactory
+import org.iq80.leveldb.{DB, Options}
+
+import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+
+/**
+ * just a cheat to get package-visible members in tests
+ */
+object ShuffleTestAccessor {
+
+  def getBlockResolver(handler: ExternalShuffleBlockHandler): 
ExternalShuffleBlockResolver = {
+    handler.blockManager
+  }
+
+  def getExecutorInfo(
+      appId: ApplicationId,
+      execId: String,
+      resolver: ExternalShuffleBlockResolver
+  ): Option[ExecutorShuffleInfo] = {
+    val id = new AppExecId(appId.toString, execId)
+    Option(resolver.executors.get(id))
+  }
+
+  def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
+    resolver.registeredExecutorFile
+  }
+
+  def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = {
+    resolver.db
+  }
+
+  def reloadRegisteredExecutors(
+    file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, 
ExecutorShuffleInfo] = {
+    val options: Options = new Options
+    options.createIfMissing(true)
+    val factory = new JniDBFactory
+    val db = factory.open(file, options)
+    val result = ExternalShuffleBlockResolver.reloadRegisteredExecutors(db)
+    db.close()
+    result
+  }
+
+  def reloadRegisteredExecutors(
+      db: DB): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, 
ExecutorShuffleInfo] = {
+    ExternalShuffleBlockResolver.reloadRegisteredExecutors(db)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
new file mode 100644
index 0000000..a58784f
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.yarn
+
+import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.nio.ByteBuffer
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermission._
+import java.util.EnumSet
+
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.service.ServiceStateException
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, 
ApplicationTerminationContext}
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.network.shuffle.ShuffleTestAccessor
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+import org.apache.spark.util.Utils
+
+class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach {
+  private[yarn] var yarnConfig: YarnConfiguration = null
+  private[yarn] val SORT_MANAGER = 
"org.apache.spark.shuffle.sort.SortShuffleManager"
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    yarnConfig = new YarnConfiguration()
+    yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
+    
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
+      classOf[YarnShuffleService].getCanonicalName)
+    yarnConfig.setInt("spark.shuffle.service.port", 0)
+    yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
+    val localDir = Utils.createTempDir()
+    yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
+  }
+
+  var s1: YarnShuffleService = null
+  var s2: YarnShuffleService = null
+  var s3: YarnShuffleService = null
+
+  override def afterEach(): Unit = {
+    try {
+      if (s1 != null) {
+        s1.stop()
+        s1 = null
+      }
+      if (s2 != null) {
+        s2.stop()
+        s2 = null
+      }
+      if (s3 != null) {
+        s3.stop()
+        s3 = null
+      }
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  test("executor state kept across NM restart") {
+    s1 = new YarnShuffleService
+    // set auth to true to test the secrets recovery
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
+    s1.init(yarnConfig)
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Id = ApplicationId.newInstance(0, 2)
+    val app2Data = makeAppInfo("user", app2Id)
+    s1.initializeApplication(app2Data)
+
+    val execStateFile = s1.registeredExecutorFile
+    execStateFile should not be (null)
+    val secretsFile = s1.secretsFile
+    secretsFile should not be (null)
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
+
+    val blockHandler = s1.blockHandler
+    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
+
+    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
+    blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
+      be (Some(shuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
+      be (Some(shuffleInfo2))
+
+    if (!execStateFile.exists()) {
+      @tailrec def findExistingParent(file: File): File = {
+        if (file == null) file
+        else if (file.exists()) file
+        else findExistingParent(file.getParentFile())
+      }
+      val existingParent = findExistingParent(execStateFile)
+      assert(false, s"$execStateFile does not exist -- closest existing parent 
is $existingParent")
+    }
+    assert(execStateFile.exists(), s"$execStateFile did not exist")
+
+    // now we pretend the shuffle service goes down, and comes back up
+    s1.stop()
+    s2 = new YarnShuffleService
+    s2.init(yarnConfig)
+    s2.secretsFile should be (secretsFile)
+    s2.registeredExecutorFile should be (execStateFile)
+
+    val handler2 = s2.blockHandler
+    val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
+
+    // now we reinitialize only one of the apps, and expect yarn to tell us 
that app2 was stopped
+    // during the restart
+    s2.initializeApplication(app1Data)
+    s2.stopApplication(new ApplicationTerminationContext(app2Id))
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be 
(Some(shuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be 
(None)
+
+    // Act like the NM restarts one more time
+    s2.stop()
+    s3 = new YarnShuffleService
+    s3.init(yarnConfig)
+    s3.registeredExecutorFile should be (execStateFile)
+    s3.secretsFile should be (secretsFile)
+
+    val handler3 = s3.blockHandler
+    val resolver3 = ShuffleTestAccessor.getBlockResolver(handler3)
+
+    // app1 is still running
+    s3.initializeApplication(app1Data)
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver3) should be 
(Some(shuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be 
(None)
+    s3.stop()
+  }
+
+  test("removed applications should not be in registered executor file") {
+    s1 = new YarnShuffleService
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+    s1.init(yarnConfig)
+    val secretsFile = s1.secretsFile
+    secretsFile should be (null)
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Id = ApplicationId.newInstance(0, 2)
+    val app2Data = makeAppInfo("user", app2Id)
+    s1.initializeApplication(app2Data)
+
+    val execStateFile = s1.registeredExecutorFile
+    execStateFile should not be (null)
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
+
+    val blockHandler = s1.blockHandler
+    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
+
+    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
+    blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+
+    val db = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver)
+    ShuffleTestAccessor.reloadRegisteredExecutors(db) should not be empty
+
+    s1.stopApplication(new ApplicationTerminationContext(app1Id))
+    ShuffleTestAccessor.reloadRegisteredExecutors(db) should not be empty
+    s1.stopApplication(new ApplicationTerminationContext(app2Id))
+    ShuffleTestAccessor.reloadRegisteredExecutors(db) shouldBe empty
+  }
+
+  test("shuffle service should be robust to corrupt registered executor file") 
{
+    s1 = new YarnShuffleService
+    s1.init(yarnConfig)
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+
+    val execStateFile = s1.registeredExecutorFile
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
+
+    val blockHandler = s1.blockHandler
+    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
+
+    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
+
+    // now we pretend the shuffle service goes down, and comes back up.  But 
we'll also
+    // make a corrupt registeredExecutor File
+    s1.stop()
+
+    execStateFile.listFiles().foreach{_.delete()}
+
+    val out = new DataOutputStream(new FileOutputStream(execStateFile + 
"/CURRENT"))
+    out.writeInt(42)
+    out.close()
+
+    s2 = new YarnShuffleService
+    s2.init(yarnConfig)
+    s2.registeredExecutorFile should be (execStateFile)
+
+    val handler2 = s2.blockHandler
+    val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
+
+    // we re-initialize app1, but since the file was corrupt there is nothing 
we can do about it ...
+    s2.initializeApplication(app1Data)
+    // however, when we initialize a totally new app2, everything is still 
happy
+    val app2Id = ApplicationId.newInstance(0, 2)
+    val app2Data = makeAppInfo("user", app2Id)
+    s2.initializeApplication(app2Data)
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
+    resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be 
(Some(shuffleInfo2))
+    s2.stop()
+
+    // another stop & restart should be fine though (eg., we recover from 
previous corruption)
+    s3 = new YarnShuffleService
+    s3.init(yarnConfig)
+    s3.registeredExecutorFile should be (execStateFile)
+    val handler3 = s3.blockHandler
+    val resolver3 = ShuffleTestAccessor.getBlockResolver(handler3)
+
+    s3.initializeApplication(app2Data)
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be 
(Some(shuffleInfo2))
+    s3.stop()
+  }
+
+  test("get correct recovery path") {
+    // Test recovery path is set outside the shuffle service, this is to 
simulate NM recovery
+    // enabled scenario, where recovery path will be set by yarn.
+    s1 = new YarnShuffleService
+    val recoveryPath = new Path(Utils.createTempDir().toURI)
+    s1.setRecoveryPath(recoveryPath)
+
+    s1.init(yarnConfig)
+    s1._recoveryPath should be (recoveryPath)
+    s1.stop()
+
+    // Test recovery path is set inside the shuffle service, this will be 
happened when NM
+    // recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
+    s2 = new YarnShuffleService
+    s2.init(yarnConfig)
+    s2._recoveryPath should be
+      (new 
Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
+    s2.stop()
+  }
+
+  test("moving recovery file from NM local dir to recovery path") {
+    // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is 
enabled, we should move
+    // old recovery file to the new path to keep compatibility
+
+    // Simulate s1 is running on old version of Hadoop in which recovery file 
is in the NM local
+    // dir.
+    s1 = new YarnShuffleService
+    // set auth to true to test the secrets recovery
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
+    s1.init(yarnConfig)
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Id = ApplicationId.newInstance(0, 2)
+    val app2Data = makeAppInfo("user", app2Id)
+    s1.initializeApplication(app2Data)
+
+    assert(s1.secretManager.getSecretKey(app1Id.toString()) != null)
+    assert(s1.secretManager.getSecretKey(app2Id.toString()) != null)
+
+    val execStateFile = s1.registeredExecutorFile
+    execStateFile should not be (null)
+    val secretsFile = s1.secretsFile
+    secretsFile should not be (null)
+    val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, 
SORT_MANAGER)
+    val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, 
SORT_MANAGER)
+
+    val blockHandler = s1.blockHandler
+    val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
+    ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be 
(execStateFile)
+
+    blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
+    blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
+      be (Some(shuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
+      be (Some(shuffleInfo2))
+
+    assert(execStateFile.exists(), s"$execStateFile did not exist")
+
+    s1.stop()
+
+    // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
+    assert(execStateFile.exists())
+    val recoveryPath = new Path(Utils.createTempDir().toURI)
+    s2 = new YarnShuffleService
+    s2.setRecoveryPath(recoveryPath)
+    s2.init(yarnConfig)
+
+    // Ensure that s2 has loaded known apps from the secrets db.
+    assert(s2.secretManager.getSecretKey(app1Id.toString()) != null)
+    assert(s2.secretManager.getSecretKey(app2Id.toString()) != null)
+
+    val execStateFile2 = s2.registeredExecutorFile
+    val secretsFile2 = s2.secretsFile
+
+    recoveryPath.toString should be (new 
Path(execStateFile2.getParentFile.toURI).toString)
+    recoveryPath.toString should be (new 
Path(secretsFile2.getParentFile.toURI).toString)
+    eventually(timeout(10 seconds), interval(5 millis)) {
+      assert(!execStateFile.exists())
+    }
+    eventually(timeout(10 seconds), interval(5 millis)) {
+      assert(!secretsFile.exists())
+    }
+
+    val handler2 = s2.blockHandler
+    val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
+
+    // now we reinitialize only one of the apps, and expect yarn to tell us 
that app2 was stopped
+    // during the restart
+    // Since recovery file is got from old path, so the previous state should 
be stored.
+    s2.initializeApplication(app1Data)
+    s2.stopApplication(new ApplicationTerminationContext(app2Id))
+    ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be 
(Some(shuffleInfo1))
+    ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be 
(None)
+
+    s2.stop()
+  }
+
+  test("service throws error if cannot start") {
+    // Set up a read-only local dir.
+    val roDir = Utils.createTempDir()
+    Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, 
OWNER_EXECUTE))
+    yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
+
+    // Try to start the shuffle service, it should fail.
+    val service = new YarnShuffleService()
+
+    try {
+      val error = intercept[ServiceStateException] {
+        service.init(yarnConfig)
+      }
+      assert(error.getCause().isInstanceOf[IOException])
+    } finally {
+      service.stop()
+      Files.setPosixFilePermissions(roDir.toPath(),
+        EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE))
+    }
+  }
+
+  private def makeAppInfo(user: String, appId: ApplicationId): 
ApplicationInitializationContext = {
+    val secret = ByteBuffer.wrap(new Array[Byte](0))
+    new ApplicationInitializationContext(user, appId, secret)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
new file mode 100644
index 0000000..db322cd
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnTestAccessor.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.yarn
+
+import java.io.File
+
+/**
+ * just a cheat to get package-visible members in tests
+ */
+object YarnTestAccessor {
+  def getShuffleServicePort: Int = {
+    YarnShuffleService.boundPort
+  }
+
+  def getShuffleServiceInstance: YarnShuffleService = {
+    YarnShuffleService.instance
+  }
+
+  def getRegisteredExecutorFile(service: YarnShuffleService): File = {
+    service.registeredExecutorFile
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
new file mode 100644
index 0000000..6ea7984
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+
+/**
+ * Test the integration with [[SchedulerExtensionServices]]
+ */
+class ExtensionServiceIntegrationSuite extends SparkFunSuite
+  with LocalSparkContext with BeforeAndAfter
+  with Logging {
+
+  val applicationId = new StubApplicationId(0, 1111L)
+  val attemptId = new StubApplicationAttemptId(applicationId, 1)
+
+  /*
+   * Setup phase creates the spark context
+   */
+  before {
+    val sparkConf = new SparkConf()
+    sparkConf.set(SCHEDULER_SERVICES, 
Seq(classOf[SimpleExtensionService].getName()))
+    sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
+    sc = new SparkContext(sparkConf)
+  }
+
+  test("Instantiate") {
+    val services = new SchedulerExtensionServices()
+    assertResult(Nil, "non-nil service list") {
+      services.getServices
+    }
+    services.start(SchedulerExtensionServiceBinding(sc, applicationId))
+    services.stop()
+  }
+
+  test("Contains SimpleExtensionService Service") {
+    val services = new SchedulerExtensionServices()
+    try {
+      services.start(SchedulerExtensionServiceBinding(sc, applicationId))
+      val serviceList = services.getServices
+      assert(serviceList.nonEmpty, "empty service list")
+      val (service :: Nil) = serviceList
+      val simpleService = service.asInstanceOf[SimpleExtensionService]
+      assert(simpleService.started.get, "service not started")
+      services.stop()
+      assert(!simpleService.started.get, "service not stopped")
+    } finally {
+      services.stop()
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
new file mode 100644
index 0000000..9b8c98c
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+private[spark] class SimpleExtensionService extends SchedulerExtensionService {
+
+  /** started flag; set in the `start()` call, stopped in `stop()`. */
+  val started = new AtomicBoolean(false)
+
+  override def start(binding: SchedulerExtensionServiceBinding): Unit = {
+    started.set(true)
+  }
+
+  override def stop(): Unit = {
+    started.set(false)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
new file mode 100644
index 0000000..4b57b95
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+
+/**
+ * A stub application ID; can be set in constructor and/or updated later.
+ * @param applicationId application ID
+ * @param attempt an attempt counter
+ */
+class StubApplicationAttemptId(var applicationId: ApplicationId, var attempt: 
Int)
+    extends ApplicationAttemptId {
+
+  override def setApplicationId(appID: ApplicationId): Unit = {
+    applicationId = appID
+  }
+
+  override def getAttemptId: Int = {
+    attempt
+  }
+
+  override def setAttemptId(attemptId: Int): Unit = {
+    attempt = attemptId
+  }
+
+  override def getApplicationId: ApplicationId = {
+    applicationId
+  }
+
+  override def build(): Unit = {
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
new file mode 100644
index 0000000..bffa0e0
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.records.ApplicationId
+
+/**
+ * Simple Testing Application Id; ID and cluster timestamp are set in 
constructor
+ * and cannot be updated.
+ * @param id app id
+ * @param clusterTimestamp timestamp
+ */
+private[spark] class StubApplicationId(id: Int, clusterTimestamp: Long) 
extends ApplicationId {
+  override def getId: Int = {
+    id
+  }
+
+  override def getClusterTimestamp: Long = {
+    clusterTimestamp
+  }
+
+  override def setId(id: Int): Unit = {}
+
+  override def setClusterTimestamp(clusterTimestamp: Long): Unit = {}
+
+  override def build(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/pom.xml
----------------------------------------------------------------------
diff --git a/yarn/pom.xml b/yarn/pom.xml
deleted file mode 100644
index 91e3437..0000000
--- a/yarn/pom.xml
+++ /dev/null
@@ -1,215 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>spark-yarn_2.11</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project YARN</name>
-  <properties>
-    <sbt.project.name>yarn</sbt.project.name>
-    <jersey-1.version>1.9</jersey-1.version>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-network-yarn_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-tags_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-web-proxy</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-client</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-    </dependency>
-
-    <!-- Explicit listing of transitive deps that are shaded. Otherwise, odd 
compiler crashes. -->
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-server</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-plus</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-util</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-http</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-servlet</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty</groupId>
-      <artifactId>jetty-servlets</artifactId>
-    </dependency>
-    <!-- End of shaded deps. -->
-
-    <!--
-      SPARK-10059: Explicitly add JSP dependencies for tests since the 
MiniYARN cluster needs them.
-    -->
-    <dependency>
-      <groupId>org.eclipse.jetty.orbit</groupId>
-      <artifactId>javax.servlet.jsp</artifactId>
-      <version>2.2.0.v201112011158</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.eclipse.jetty.orbit</groupId>
-      <artifactId>javax.servlet.jsp.jstl</artifactId>
-      <version>1.2.0.v201105211821</version>
-      <scope>test</scope>
-    </dependency>
-
-     <!--
-    See SPARK-3710. hadoop-yarn-server-tests in Hadoop 2.2 fails to pull some 
needed
-    dependencies, so they need to be added manually for the tests to work.
-    -->
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-tests</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mortbay.jetty</groupId>
-      <artifactId>jetty</artifactId>
-      <version>6.1.26</version>
-      <exclusions>
-       <exclusion>
-        <groupId>org.mortbay.jetty</groupId>
-         <artifactId>servlet-api</artifactId>
-       </exclusion>
-      </exclusions>
-      <scope>test</scope>
-     </dependency>
-
-     <!--
-       Jersey 1 dependencies only required for YARN integration testing. 
Creating a YARN cluster
-       in the JVM requires starting a Jersey 1-based web application.
-     -->
-     <dependency>
-       <groupId>com.sun.jersey</groupId>
-       <artifactId>jersey-core</artifactId>
-       <scope>test</scope>
-       <version>${jersey-1.version}</version>
-     </dependency>
-     <dependency>
-       <groupId>com.sun.jersey</groupId>
-       <artifactId>jersey-json</artifactId>
-       <scope>test</scope>
-       <version>${jersey-1.version}</version>
-     </dependency>
-     <dependency>
-       <groupId>com.sun.jersey</groupId>
-       <artifactId>jersey-server</artifactId>
-       <scope>test</scope>
-       <version>${jersey-1.version}</version>
-     </dependency>
-     <dependency>
-       <groupId>com.sun.jersey.contribs</groupId>
-       <artifactId>jersey-guice</artifactId>
-       <scope>test</scope>
-       <version>${jersey-1.version}</version>
-     </dependency>
-
-    <!--
-     Testing Hive reflection needs hive on the test classpath only.
-     It doesn't need the spark hive modules, so the -Phive flag is not checked.
-      -->
-    <dependency>
-      <groupId>${hive.group}</groupId>
-      <artifactId>hive-exec</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>${hive.group}</groupId>
-      <artifactId>hive-metastore</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.thrift</groupId>
-      <artifactId>libthrift</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.thrift</groupId>
-      <artifactId>libfb303</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
 
b/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
deleted file mode 100644
index 22ead56..0000000
--- 
a/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ /dev/null
@@ -1,3 +0,0 @@
-org.apache.spark.deploy.yarn.security.HDFSCredentialProvider
-org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
-org.apache.spark.deploy.yarn.security.HiveCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
 
b/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
deleted file mode 100644
index 6e8a1eb..0000000
--- 
a/yarn/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.spark.scheduler.cluster.YarnClusterManager


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to