http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
new file mode 100644
index 0000000..7e76f40
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn.security
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The following methods are primarily meant to make sure long-running apps 
like Spark
+ * Streaming apps can run without interruption while accessing secured 
services. The
+ * scheduleLoginFromKeytab method is called on the AM to get the new 
credentials.
+ * This method wakes up a thread that logs into the KDC
+ * once 75% of the renewal interval of the original credentials used for the 
container
+ * has elapsed. It then obtains new credentials and writes them to HDFS in a
+ * pre-specified location - the prefix of which is specified in the sparkConf 
by
+ * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, 
c-timestamp2-2 etc.
+ * - each update goes to a new file, with a monotonically increasing suffix), 
also the
+ * timestamp1, timestamp2 here indicates the time of next update for 
CredentialUpdater.
+ * After this, the credentials are renewed once 75% of the new tokens renewal 
interval has elapsed.
+ *
+ * On the executor and driver (yarn client mode) side, the 
updateCredentialsIfRequired method is
+ * called once 80% of the validity of the original credentials has elapsed. At 
that time the
+ * executor finds the credentials file with the latest timestamp and checks if 
it has read those
+ * credentials before (by keeping track of the suffix of the last file it 
read). If a new file has
+ * appeared, it will read the credentials and update the currently running UGI 
with it. This
+ * process happens again once 80% of the validity of this has expired.
+ */
+private[yarn] class AMCredentialRenewer(
+    sparkConf: SparkConf,
+    hadoopConf: Configuration,
+    credentialManager: ConfigurableCredentialManager) extends Logging {
+
+  private var lastCredentialsFileSuffix = 0
+
+  private val credentialRenewer =
+    Executors.newSingleThreadScheduledExecutor(
+      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  private val hadoopUtil = YarnSparkHadoopUtil.get
+
+  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+  private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
+  private val numFilesToKeep = sparkConf.get(CREDENTIAL_FILE_MAX_COUNT)
+  private val freshHadoopConf =
+    hadoopUtil.getConfBypassingFSCache(hadoopConf, new 
Path(credentialsFile).toUri.getScheme)
+
+  @volatile private var timeOfNextRenewal = 
sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+
+  /**
+   * Schedule a login from the keytab and principal set using the --principal 
and --keytab
+   * arguments to spark-submit. This login happens only when the credentials 
of the current user
+   * are about to expire. This method reads spark.yarn.principal and 
spark.yarn.keytab from
+   * SparkConf to do the login. This method is a no-op in non-YARN mode.
+   *
+   */
+  private[spark] def scheduleLoginFromKeytab(): Unit = {
+    val principal = sparkConf.get(PRINCIPAL).get
+    val keytab = sparkConf.get(KEYTAB).get
+
+    /**
+     * Schedule re-login and creation of new credentials. If credentials have 
already expired, this
+     * method will synchronously create new ones.
+     */
+    def scheduleRenewal(runnable: Runnable): Unit = {
+      // Run now!
+      val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+      if (remainingTime <= 0) {
+        logInfo("Credentials have expired, creating new ones now.")
+        runnable.run()
+      } else {
+        logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+        credentialRenewer.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+      }
+    }
+
+    // This thread periodically runs on the AM to update the credentials on 
HDFS.
+    val credentialRenewerRunnable =
+      new Runnable {
+        override def run(): Unit = {
+          try {
+            writeNewCredentialsToHDFS(principal, keytab)
+            cleanupOldFiles()
+          } catch {
+            case e: Exception =>
+              // Log the error and try to write new tokens back in an hour
+              logWarning("Failed to write out new credentials to HDFS, will 
try again in an " +
+                "hour! If this happens too often tasks will fail.", e)
+              credentialRenewer.schedule(this, 1, TimeUnit.HOURS)
+              return
+          }
+          scheduleRenewal(this)
+        }
+      }
+    // Schedule update of credentials. This handles the case of updating the 
credentials right now
+    // as well, since the renewal interval will be 0, and the thread will get 
scheduled
+    // immediately.
+    scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  // Keeps only files that are newer than daysToKeepFiles days, and deletes 
everything else. At
+  // least numFilesToKeep files are kept for safety
+  private def cleanupOldFiles(): Unit = {
+    import scala.concurrent.duration._
+    try {
+      val remoteFs = FileSystem.get(freshHadoopConf)
+      val credentialsPath = new Path(credentialsFile)
+      val thresholdTime = System.currentTimeMillis() - 
(daysToKeepFiles.days).toMillis
+      hadoopUtil.listFilesSorted(
+        remoteFs, credentialsPath.getParent,
+        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+        .dropRight(numFilesToKeep)
+        .takeWhile(_.getModificationTime < thresholdTime)
+        .foreach(x => remoteFs.delete(x.getPath, true))
+    } catch {
+      // Such errors are not fatal, so don't throw. Make sure they are logged 
though
+      case e: Exception =>
+        logWarning("Error while attempting to cleanup old credentials. If you 
are seeing many " +
+          "such warnings there may be an issue with your HDFS cluster.", e)
+    }
+  }
+
+  private def writeNewCredentialsToHDFS(principal: String, keytab: String): 
Unit = {
+    // Keytab is copied by YARN to the working directory of the AM, so full 
path is
+    // not needed.
+
+    // HACK:
+    // HDFS will not issue new delegation tokens, if the Credentials object
+    // passed in already has tokens for that FS even if the tokens are expired 
(it really only
+    // checks if there are tokens for the service, and not if they are valid). 
So the only real
+    // way to get new tokens is to make sure a different Credentials object is 
used each time to
+    // get new tokens and then the new tokens are copied over the current 
user's Credentials.
+    // So:
+    // - we login as a different user and get the UGI
+    // - use that UGI to get the tokens (see doAs block below)
+    // - copy the tokens over to the current user's credentials (this will 
overwrite the tokens
+    // in the current user's Credentials object for this FS).
+    // The login to KDC happens each time new tokens are required, but this is 
rare enough to not
+    // have to worry about (like once every day or so). This makes this code 
clearer than having
+    // to login and then relogin every time (the HDFS API may not relogin 
since we don't use this
+    // UGI directly for HDFS communication.
+    logInfo(s"Attempting to login to KDC using principal: $principal")
+    val keytabLoggedInUGI = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+    logInfo("Successfully logged into KDC.")
+    val tempCreds = keytabLoggedInUGI.getCredentials
+    val credentialsPath = new Path(credentialsFile)
+    val dst = credentialsPath.getParent
+    var nearestNextRenewalTime = Long.MaxValue
+    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
+      // Get a copy of the credentials
+      override def run(): Void = {
+        nearestNextRenewalTime = 
credentialManager.obtainCredentials(freshHadoopConf, tempCreds)
+        null
+      }
+    })
+
+    val currTime = System.currentTimeMillis()
+    val timeOfNextUpdate = if (nearestNextRenewalTime <= currTime) {
+      // If next renewal time is earlier than current time, we set next 
renewal time to current
+      // time, this will trigger next renewal immediately. Also set next 
update time to current
+      // time. There still has a gap between token renewal and update will 
potentially introduce
+      // issue.
+      logWarning(s"Next credential renewal time ($nearestNextRenewalTime) is 
earlier than " +
+        s"current time ($currTime), which is unexpected, please check your 
credential renewal " +
+        "related configurations in the target services.")
+      timeOfNextRenewal = currTime
+      currTime
+    } else {
+      // Next valid renewal time is about 75% of credential renewal time, and 
update time is
+      // slightly later than valid renewal time (80% of renewal time).
+      timeOfNextRenewal = ((nearestNextRenewalTime - currTime) * 0.75 + 
currTime).toLong
+      ((nearestNextRenewalTime - currTime) * 0.8 + currTime).toLong
+    }
+
+    // Add the temp credentials back to the original ones.
+    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
+    val remoteFs = FileSystem.get(freshHadoopConf)
+    // If lastCredentialsFileSuffix is 0, then the AM is either started or 
restarted. If the AM
+    // was restarted, then the lastCredentialsFileSuffix might be > 0, so find 
the newest file
+    // and update the lastCredentialsFileSuffix.
+    if (lastCredentialsFileSuffix == 0) {
+      hadoopUtil.listFilesSorted(
+        remoteFs, credentialsPath.getParent,
+        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+        .lastOption.foreach { status =>
+        lastCredentialsFileSuffix = 
hadoopUtil.getSuffixForCredentialsPath(status.getPath)
+      }
+    }
+    val nextSuffix = lastCredentialsFileSuffix + 1
+
+    val tokenPathStr =
+      credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
+        timeOfNextUpdate.toLong.toString + 
SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM +
+          nextSuffix
+    val tokenPath = new Path(tokenPathStr)
+    val tempTokenPath = new Path(tokenPathStr + 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+
+    logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
+    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+    credentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)
+    logInfo(s"Delegation Tokens written out successfully. Renaming file to 
$tokenPathStr")
+    remoteFs.rename(tempTokenPath, tokenPath)
+    logInfo("Delegation token file rename complete.")
+    lastCredentialsFileSuffix = nextSuffix
+  }
+
+  def stop(): Unit = {
+    credentialRenewer.shutdown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
new file mode 100644
index 0000000..c4c07b4
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * A ConfigurableCredentialManager to manage all the registered credential 
providers and offer
+ * APIs for other modules to obtain credentials as well as renewal time. By 
default
+ * [[HDFSCredentialProvider]], [[HiveCredentialProvider]] and 
[[HBaseCredentialProvider]] will
+ * be loaded in if not explicitly disabled, any plugged-in credential provider 
wants to be
+ * managed by ConfigurableCredentialManager needs to implement 
[[ServiceCredentialProvider]]
+ * interface and put into resources/META-INF/services to be loaded by 
ServiceLoader.
+ *
+ * Also each credential provider is controlled by
+ * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in 
if set to false.
+ */
+private[yarn] final class ConfigurableCredentialManager(
+    sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
+  private val deprecatedProviderEnabledConfig = 
"spark.yarn.security.tokens.%s.enabled"
+  private val providerEnabledConfig = 
"spark.yarn.security.credentials.%s.enabled"
+
+  // Maintain all the registered credential providers
+  private val credentialProviders = {
+    val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
+      Utils.getContextOrSparkClassLoader).asScala
+
+    // Filter out credentials in which 
spark.yarn.security.credentials.{service}.enabled is false.
+    providers.filter { p =>
+      sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
+        .orElse {
+          
sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map 
{ c =>
+            
logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is 
deprecated, " +
+              s"using ${providerEnabledConfig.format(p.serviceName)} instead")
+            c
+          }
+        }.map(_.toBoolean).getOrElse(true)
+    }.map { p => (p.serviceName, p) }.toMap
+  }
+
+  /**
+   * Get credential provider for the specified service.
+   */
+  def getServiceCredentialProvider(service: String): 
Option[ServiceCredentialProvider] = {
+    credentialProviders.get(service)
+  }
+
+  /**
+   * Obtain credentials from all the registered providers.
+   * @return nearest time of next renewal, Long.MaxValue if all the 
credentials aren't renewable,
+   *         otherwise the nearest renewal time of any credentials will be 
returned.
+   */
+  def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = 
{
+    credentialProviders.values.flatMap { provider =>
+      if (provider.credentialsRequired(hadoopConf)) {
+        provider.obtainCredentials(hadoopConf, sparkConf, creds)
+      } else {
+        logDebug(s"Service ${provider.serviceName} does not require a token." +
+          s" Check your configuration to see if security is disabled or not.")
+        None
+      }
+    }.foldLeft(Long.MaxValue)(math.min)
+  }
+
+  /**
+   * Create an [[AMCredentialRenewer]] instance, caller should be responsible 
to stop this
+   * instance when it is not used. AM will use it to renew credentials 
periodically.
+   */
+  def credentialRenewer(): AMCredentialRenewer = {
+    new AMCredentialRenewer(sparkConf, hadoopConf, this)
+  }
+
+  /**
+   * Create an [[CredentialUpdater]] instance, caller should be resposible to 
stop this intance
+   * when it is not used. Executors and driver (client mode) will use it to 
update credentials.
+   * periodically.
+   */
+  def credentialUpdater(): CredentialUpdater = {
+    new CredentialUpdater(sparkConf, hadoopConf, this)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
new file mode 100644
index 0000000..5df4fbd
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] class CredentialUpdater(
+    sparkConf: SparkConf,
+    hadoopConf: Configuration,
+    credentialManager: ConfigurableCredentialManager) extends Logging {
+
+  @volatile private var lastCredentialsFileSuffix = 0
+
+  private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
+  private val freshHadoopConf =
+    SparkHadoopUtil.get.getConfBypassingFSCache(
+      hadoopConf, new Path(credentialsFile).toUri.getScheme)
+
+  private val credentialUpdater =
+    Executors.newSingleThreadScheduledExecutor(
+      ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  // This thread wakes up and picks up new credentials from HDFS, if any.
+  private val credentialUpdaterRunnable =
+    new Runnable {
+      override def run(): Unit = 
Utils.logUncaughtExceptions(updateCredentialsIfRequired())
+    }
+
+  /** Start the credential updater task */
+  def start(): Unit = {
+    val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
+    val remainingTime = startTime - System.currentTimeMillis()
+    if (remainingTime <= 0) {
+      credentialUpdater.schedule(credentialUpdaterRunnable, 1, 
TimeUnit.MINUTES)
+    } else {
+      logInfo(s"Scheduling credentials refresh from HDFS in $remainingTime 
millis.")
+      credentialUpdater.schedule(credentialUpdaterRunnable, remainingTime, 
TimeUnit.MILLISECONDS)
+    }
+  }
+
+  private def updateCredentialsIfRequired(): Unit = {
+    val timeToNextUpdate = try {
+      val credentialsFilePath = new Path(credentialsFile)
+      val remoteFs = FileSystem.get(freshHadoopConf)
+      SparkHadoopUtil.get.listFilesSorted(
+        remoteFs, credentialsFilePath.getParent,
+        credentialsFilePath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
+        .lastOption.map { credentialsStatus =>
+          val suffix = 
SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
+          if (suffix > lastCredentialsFileSuffix) {
+            logInfo("Reading new credentials from " + 
credentialsStatus.getPath)
+            val newCredentials = getCredentialsFromHDFSFile(remoteFs, 
credentialsStatus.getPath)
+            lastCredentialsFileSuffix = suffix
+            UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
+            logInfo("Credentials updated from credentials file.")
+
+            val remainingTime = 
getTimeOfNextUpdateFromFileName(credentialsStatus.getPath)
+              - System.currentTimeMillis()
+            if (remainingTime <= 0) TimeUnit.MINUTES.toMillis(1) else 
remainingTime
+          } else {
+            // If current credential file is older than expected, sleep 1 hour 
and check again.
+            TimeUnit.HOURS.toMillis(1)
+          }
+      }.getOrElse {
+        // Wait for 1 minute to check again if there's no credential file 
currently
+        TimeUnit.MINUTES.toMillis(1)
+      }
+    } catch {
+      // Since the file may get deleted while we are reading it, catch the 
Exception and come
+      // back in an hour to try again
+      case NonFatal(e) =>
+        logWarning("Error while trying to update credentials, will try again 
in 1 hour", e)
+        TimeUnit.HOURS.toMillis(1)
+    }
+
+    credentialUpdater.schedule(
+      credentialUpdaterRunnable, timeToNextUpdate, TimeUnit.MILLISECONDS)
+  }
+
+  private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: 
Path): Credentials = {
+    val stream = remoteFs.open(tokenPath)
+    try {
+      val newCredentials = new Credentials()
+      newCredentials.readTokenStorageStream(stream)
+      newCredentials
+    } finally {
+      stream.close()
+    }
+  }
+
+  private def getTimeOfNextUpdateFromFileName(credentialsPath: Path): Long = {
+    val name = credentialsPath.getName
+    val index = 
name.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
+    val slice = name.substring(0, index)
+    val last2index = 
slice.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM)
+    name.substring(last2index + 1, index).toLong
+  }
+
+  def stop(): Unit = {
+    credentialUpdater.shutdown()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
new file mode 100644
index 0000000..5571df0
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+private[security] class HBaseCredentialProvider extends 
ServiceCredentialProvider with Logging {
+
+  override def serviceName: String = "hbase"
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    try {
+      val mirror = universe.runtimeMirror(getClass.getClassLoader)
+      val obtainToken = mirror.classLoader.
+        loadClass("org.apache.hadoop.hbase.security.token.TokenUtil").
+        getMethod("obtainToken", classOf[Configuration])
+
+      logDebug("Attempting to fetch HBase security token.")
+      val token = obtainToken.invoke(null, hbaseConf(hadoopConf))
+        .asInstanceOf[Token[_ <: TokenIdentifier]]
+      logInfo(s"Get token from HBase: ${token.toString}")
+      creds.addToken(token.getService, token)
+    } catch {
+      case NonFatal(e) =>
+        logDebug(s"Failed to get token from service $serviceName", e)
+    }
+
+    None
+  }
+
+  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
+    hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
+  }
+
+  private def hbaseConf(conf: Configuration): Configuration = {
+    try {
+      val mirror = universe.runtimeMirror(getClass.getClassLoader)
+      val confCreate = mirror.classLoader.
+        loadClass("org.apache.hadoop.hbase.HBaseConfiguration").
+        getMethod("create", classOf[Configuration])
+      confCreate.invoke(null, conf).asInstanceOf[Configuration]
+    } catch {
+      case NonFatal(e) =>
+        logDebug("Fail to invoke HBaseConfiguration", e)
+        conf
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
new file mode 100644
index 0000000..8d06d73
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProvider.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.io.{ByteArrayInputStream, DataInputStream}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.mapred.Master
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[security] class HDFSCredentialProvider extends 
ServiceCredentialProvider with Logging {
+  // Token renewal interval, this value will be set in the first call,
+  // if None means no token renewer specified, so cannot get token renewal 
interval.
+  private var tokenRenewalInterval: Option[Long] = null
+
+  override val serviceName: String = "hdfs"
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    // NameNode to access, used to get tokens from different FileSystems
+    nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+      val dstFs = dst.getFileSystem(hadoopConf)
+      logInfo("getting token for namenode: " + dst)
+      dstFs.addDelegationTokens(getTokenRenewer(hadoopConf), creds)
+    }
+
+    // Get the token renewal interval if it is not set. It will only be called 
once.
+    if (tokenRenewalInterval == null) {
+      tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
+    }
+
+    // Get the time of next renewal.
+    tokenRenewalInterval.map { interval =>
+      creds.getAllTokens.asScala
+        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+        .map { t =>
+          val identifier = new DelegationTokenIdentifier()
+          identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
+          identifier.getIssueDate + interval
+      }.foldLeft(0L)(math.max)
+    }
+  }
+
+  private def getTokenRenewalInterval(
+      hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
+    // We cannot use the tokens generated with renewer yarn. Trying to renew
+    // those will fail with an access control issue. So create new tokens with 
the logged in
+    // user as renewer.
+    sparkConf.get(PRINCIPAL).map { renewer =>
+      val creds = new Credentials()
+      nnsToAccess(hadoopConf, sparkConf).foreach { dst =>
+        val dstFs = dst.getFileSystem(hadoopConf)
+        dstFs.addDelegationTokens(renewer, creds)
+      }
+      val t = creds.getAllTokens.asScala
+        .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
+        .head
+      val newExpiration = t.renew(hadoopConf)
+      val identifier = new DelegationTokenIdentifier()
+      identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
+      val interval = newExpiration - identifier.getIssueDate
+      logInfo(s"Renewal Interval is $interval")
+      interval
+    }
+  }
+
+  private def getTokenRenewer(conf: Configuration): String = {
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    logDebug("delegation token renewer is: " + delegTokenRenewer)
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+
+    delegTokenRenewer
+  }
+
+  private def nnsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): 
Set[Path] = {
+    sparkConf.get(NAMENODES_TO_ACCESS).map(new Path(_)).toSet +
+      sparkConf.get(STAGING_DIR).map(new Path(_))
+        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
new file mode 100644
index 0000000..16d8fc3
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.lang.reflect.UndeclaredThrowableException
+import java.security.PrivilegedExceptionAction
+
+import scala.reflect.runtime.universe
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[security] class HiveCredentialProvider extends 
ServiceCredentialProvider with Logging {
+
+  override def serviceName: String = "hive"
+
+  private def hiveConf(hadoopConf: Configuration): Configuration = {
+    try {
+      val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+      // the hive configuration class is a subclass of Hadoop Configuration, 
so can be cast down
+      // to a Configuration and used without reflection
+      val hiveConfClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+      // using the (Configuration, Class) constructor allows the current 
configuration to be
+      // included in the hive config.
+      val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
+        classOf[Object].getClass)
+      ctor.newInstance(hadoopConf, hiveConfClass).asInstanceOf[Configuration]
+    } catch {
+      case NonFatal(e) =>
+        logDebug("Fail to create Hive Configuration", e)
+        hadoopConf
+    }
+  }
+
+  override def credentialsRequired(hadoopConf: Configuration): Boolean = {
+    UserGroupInformation.isSecurityEnabled &&
+      hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty
+  }
+
+  override def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long] = {
+    val conf = hiveConf(hadoopConf)
+
+    val principalKey = "hive.metastore.kerberos.principal"
+    val principal = conf.getTrimmed(principalKey, "")
+    require(principal.nonEmpty, s"Hive principal $principalKey undefined")
+    val metastoreUri = conf.getTrimmed("hive.metastore.uris", "")
+    require(metastoreUri.nonEmpty, "Hive metastore uri undefined")
+
+    val currentUser = UserGroupInformation.getCurrentUser()
+    logDebug(s"Getting Hive delegation token for ${currentUser.getUserName()} 
against " +
+      s"$principal at $metastoreUri")
+
+    val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
+    val hiveClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.ql.metadata.Hive")
+    val hiveConfClass = 
mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")
+    val closeCurrent = hiveClass.getMethod("closeCurrent")
+
+    try {
+      // get all the instance methods before invoking any
+      val getDelegationToken = hiveClass.getMethod("getDelegationToken",
+        classOf[String], classOf[String])
+      val getHive = hiveClass.getMethod("get", hiveConfClass)
+
+      doAsRealUser {
+        val hive = getHive.invoke(null, conf)
+        val tokenStr = getDelegationToken.invoke(hive, 
currentUser.getUserName(), principal)
+          .asInstanceOf[String]
+        val hive2Token = new Token[DelegationTokenIdentifier]()
+        hive2Token.decodeFromUrlString(tokenStr)
+        logInfo(s"Get Token from hive metastore: ${hive2Token.toString}")
+        creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
+      }
+    } catch {
+      case NonFatal(e) =>
+        logDebug(s"Fail to get token from service $serviceName", e)
+    } finally {
+      Utils.tryLogNonFatalError {
+        closeCurrent.invoke(null)
+      }
+    }
+
+    None
+  }
+
+  /**
+   * Run some code as the real logged in user (which may differ from the 
current user, for
+   * example, when using proxying).
+   */
+  private def doAsRealUser[T](fn: => T): T = {
+    val currentUser = UserGroupInformation.getCurrentUser()
+    val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
+
+   // For some reason the Scala-generated anonymous class ends up causing an
+   // UndeclaredThrowableException, even if you annotate the method with 
@throws.
+   try {
+      realUser.doAs(new PrivilegedExceptionAction[T]() {
+        override def run(): T = fn
+      })
+    } catch {
+      case e: UndeclaredThrowableException => throw 
Option(e.getCause()).getOrElse(e)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
new file mode 100644
index 0000000..4e3fcce
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.SparkConf
+
+/**
+ * A credential provider for a service. User must implement this if they need 
to access a
+ * secure service from Spark.
+ */
+trait ServiceCredentialProvider {
+
+  /**
+   * Name of the service to provide credentials. This name should unique, 
Spark internally will
+   * use this name to differentiate credential provider.
+   */
+  def serviceName: String
+
+  /**
+   * To decide whether credential is required for this service. By default it 
based on whether
+   * Hadoop security is enabled.
+   */
+  def credentialsRequired(hadoopConf: Configuration): Boolean = {
+    UserGroupInformation.isSecurityEnabled
+  }
+
+  /**
+   * Obtain credentials for this service and get the time of the next renewal.
+   * @param hadoopConf Configuration of current Hadoop Compatible system.
+   * @param sparkConf Spark configuration.
+   * @param creds Credentials to add tokens and security keys to.
+   * @return If this Credential is renewable and can be renewed, return the 
time of the next
+   *         renewal, otherwise None should be returned.
+   */
+  def obtainCredentials(
+      hadoopConf: Configuration,
+      sparkConf: SparkConf,
+      creds: Credentials): Option[Long]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
new file mode 100644
index 0000000..6c3556a
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.util.Properties
+
+/**
+ * Exposes methods from the launcher library that are used by the YARN backend.
+ */
+private[spark] object YarnCommandBuilderUtils {
+
+  def quoteForBatchScript(arg: String): String = {
+    CommandBuilderUtils.quoteForBatchScript(arg)
+  }
+
+  def findJarsDir(sparkHome: String): String = {
+    val scalaVer = Properties.versionNumberString
+      .split("\\.")
+      .take(2)
+      .mkString(".")
+    CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true)
+  }
+
+  /**
+   * Adds the perm gen configuration to the list of java options if needed and 
not yet added.
+   *
+   * Note that this method adds the option based on the local JVM version; if 
the node where
+   * the container is running has a different Java version, there's a risk 
that the option will
+   * not be added (e.g. if the AM is running Java 8 but the container's node 
is set up to use
+   * Java 7).
+   */
+  def addPermGenSizeOpt(args: ListBuffer[String]): Unit = {
+    CommandBuilderUtils.addPermGenSizeOpt(args.asJava)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
new file mode 100644
index 0000000..4ed2852
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * An extension service that can be loaded into a Spark YARN scheduler.
+ * A Service that can be started and stopped.
+ *
+ * 1. For implementations to be loadable by `SchedulerExtensionServices`,
+ * they must provide an empty constructor.
+ * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` 
was
+ * never invoked.
+ */
+trait SchedulerExtensionService {
+
+  /**
+   * Start the extension service. This should be a no-op if
+   * called more than once.
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit
+
+  /**
+   * Stop the service
+   * The `stop()` operation MUST be idempotent, and succeed even if `start()` 
was
+   * never invoked.
+   */
+  def stop(): Unit
+}
+
+/**
+ * Binding information for a [[SchedulerExtensionService]].
+ *
+ * The attempt ID will be set if the service is started within a YARN 
application master;
+ * there is then a different attempt ID for every time that AM is restarted.
+ * When the service binding is instantiated in client mode, there's no attempt 
ID, as it lacks
+ * this information.
+ * @param sparkContext current spark context
+ * @param applicationId YARN application ID
+ * @param attemptId YARN attemptID. This will always be unset in client mode, 
and always set in
+ *                  cluster mode.
+ */
+case class SchedulerExtensionServiceBinding(
+    sparkContext: SparkContext,
+    applicationId: ApplicationId,
+    attemptId: Option[ApplicationAttemptId] = None)
+
+/**
+ * Container for [[SchedulerExtensionService]] instances.
+ *
+ * Loads Extension Services from the configuration property
+ * `"spark.yarn.services"`, instantiates and starts them.
+ * When stopped, it stops all child entries.
+ *
+ * The order in which child extension services are started and stopped
+ * is undefined.
+ */
+private[spark] class SchedulerExtensionServices extends 
SchedulerExtensionService
+    with Logging {
+  private var serviceOption: Option[String] = None
+  private var services: List[SchedulerExtensionService] = Nil
+  private val started = new AtomicBoolean(false)
+  private var binding: SchedulerExtensionServiceBinding = _
+
+  /**
+   * Binding operation will load the named services and call bind on them too; 
the
+   * entire set of services are then ready for `init()` and `start()` calls.
+   *
+   * @param binding binding to the spark application and YARN
+   */
+  def start(binding: SchedulerExtensionServiceBinding): Unit = {
+    if (started.getAndSet(true)) {
+      logWarning("Ignoring re-entrant start operation")
+      return
+    }
+    require(binding.sparkContext != null, "Null context parameter")
+    require(binding.applicationId != null, "Null appId parameter")
+    this.binding = binding
+    val sparkContext = binding.sparkContext
+    val appId = binding.applicationId
+    val attemptId = binding.attemptId
+    logInfo(s"Starting Yarn extension services with app $appId and attemptId 
$attemptId")
+
+    services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass =>
+      val instance = Utils.classForName(sClass)
+        .newInstance()
+        .asInstanceOf[SchedulerExtensionService]
+      // bind this service
+      instance.start(binding)
+      logInfo(s"Service $sClass started")
+      instance
+    }.toList
+  }
+
+  /**
+   * Get the list of services.
+   *
+   * @return a list of services; Nil until the service is started
+   */
+  def getServices: List[SchedulerExtensionService] = services
+
+  /**
+   * Stop the services; idempotent.
+   *
+   */
+  override def stop(): Unit = {
+    if (started.getAndSet(false)) {
+      logInfo(s"Stopping $this")
+      services.foreach { s =>
+        Utils.tryLogNonFatalError(s.stop())
+      }
+    }
+  }
+
+  override def toString(): String = s"""SchedulerExtensionServices
+    |(serviceOption=$serviceOption,
+    | services=$services,
+    | started=$started)""".stripMargin
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
new file mode 100644
index 0000000..60da356
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments, 
YarnSparkHadoopUtil}
+import org.apache.spark.internal.Logging
+import org.apache.spark.launcher.SparkAppHandle
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+private[spark] class YarnClientSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends YarnSchedulerBackend(scheduler, sc)
+  with Logging {
+
+  private var client: Client = null
+  private var monitorThread: MonitorThread = null
+
+  /**
+   * Create a Yarn client to submit an application to the ResourceManager.
+   * This waits until the application is running.
+   */
+  override def start() {
+    val driverHost = conf.get("spark.driver.host")
+    val driverPort = conf.get("spark.driver.port")
+    val hostport = driverHost + ":" + driverPort
+    sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
+
+    val argsArrayBuf = new ArrayBuffer[String]()
+    argsArrayBuf += ("--arg", hostport)
+
+    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
+    val args = new ClientArguments(argsArrayBuf.toArray)
+    totalExpectedExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
+    client = new Client(args, conf)
+    bindToYarn(client.submitApplication(), None)
+
+    // SPARK-8687: Ensure all necessary properties have already been set before
+    // we initialize our driver scheduler backend, which serves these 
properties
+    // to the executors
+    super.start()
+    waitForApplication()
+
+    // SPARK-8851: In yarn-client mode, the AM still does the credentials 
refresh. The driver
+    // reads the credentials from HDFS, just like the executors and updates 
its own credentials
+    // cache.
+    if (conf.contains("spark.yarn.credentials.file")) {
+      YarnSparkHadoopUtil.get.startCredentialUpdater(conf)
+    }
+    monitorThread = asyncMonitorApplication()
+    monitorThread.start()
+  }
+
+  /**
+   * Report the state of the application until it is running.
+   * If the application has finished, failed or been killed in the process, 
throw an exception.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def waitForApplication(): Unit = {
+    assert(client != null && appId.isDefined, "Application has not been 
submitted yet!")
+    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = 
true) // blocking
+    if (state == YarnApplicationState.FINISHED ||
+      state == YarnApplicationState.FAILED ||
+      state == YarnApplicationState.KILLED) {
+      throw new SparkException("Yarn application has already ended! " +
+        "It might have been killed or unable to launch application master.")
+    }
+    if (state == YarnApplicationState.RUNNING) {
+      logInfo(s"Application ${appId.get} has started running.")
+    }
+  }
+
+  /**
+   * We create this class for SPARK-9519. Basically when we interrupt the 
monitor thread it's
+   * because the SparkContext is being shut down(sc.stop() called by user 
code), but if
+   * monitorApplication return, it means the Yarn application finished before 
sc.stop() was called,
+   * which means we should call sc.stop() here, and we don't allow the monitor 
to be interrupted
+   * before SparkContext stops successfully.
+   */
+  private class MonitorThread extends Thread {
+    private var allowInterrupt = true
+
+    override def run() {
+      try {
+        val (state, _) = client.monitorApplication(appId.get, 
logApplicationReport = false)
+        logError(s"Yarn application has already exited with state $state!")
+        allowInterrupt = false
+        sc.stop()
+      } catch {
+        case e: InterruptedException => logInfo("Interrupting monitor thread")
+      }
+    }
+
+    def stopMonitor(): Unit = {
+      if (allowInterrupt) {
+        this.interrupt()
+      }
+    }
+  }
+
+  /**
+   * Monitor the application state in a separate thread.
+   * If the application has exited for any reason, stop the SparkContext.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def asyncMonitorApplication(): MonitorThread = {
+    assert(client != null && appId.isDefined, "Application has not been 
submitted yet!")
+    val t = new MonitorThread
+    t.setName("Yarn application state monitor")
+    t.setDaemon(true)
+    t
+  }
+
+  /**
+   * Stop the scheduler. This assumes `start()` has already been called.
+   */
+  override def stop() {
+    assert(client != null, "Attempted to stop this scheduler before starting 
it!")
+    if (monitorThread != null) {
+      monitorThread.stopMonitor()
+    }
+
+    // Report a final state to the launcher if one is connected. This is 
needed since in client
+    // mode this backend doesn't let the app monitor loop run to completion, 
so it does not report
+    // the final state itself.
+    //
+    // Note: there's not enough information at this point to provide a better 
final state,
+    // so assume the application was successful.
+    client.reportLauncherState(SparkAppHandle.State.FINISHED)
+
+    super.stop()
+    YarnSparkHadoopUtil.get.stopCredentialUpdater()
+    client.stop()
+    logInfo("Stopped")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
new file mode 100644
index 0000000..64cd1bd
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, 
TaskScheduler, TaskSchedulerImpl}
+
+/**
+ * Cluster Manager for creation of Yarn scheduler and backend
+ */
+private[spark] class YarnClusterManager extends ExternalClusterManager {
+
+  override def canCreate(masterURL: String): Boolean = {
+    masterURL == "yarn"
+  }
+
+  override def createTaskScheduler(sc: SparkContext, masterURL: String): 
TaskScheduler = {
+    sc.deployMode match {
+      case "cluster" => new YarnClusterScheduler(sc)
+      case "client" => new YarnScheduler(sc)
+      case _ => throw new SparkException(s"Unknown deploy mode 
'${sc.deployMode}' for Yarn")
+    }
+  }
+
+  override def createSchedulerBackend(sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend = {
+    sc.deployMode match {
+      case "cluster" =>
+        new 
YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
+      case "client" =>
+        new 
YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
+      case  _ =>
+        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for 
Yarn")
+    }
+  }
+
+  override def initialize(scheduler: TaskScheduler, backend: 
SchedulerBackend): Unit = {
+    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
new file mode 100644
index 0000000..96c9151
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.ApplicationMaster
+
+/**
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate 
initialization of
+ * ApplicationMaster, etc is done
+ */
+private[spark] class YarnClusterScheduler(sc: SparkContext) extends 
YarnScheduler(sc) {
+
+  logInfo("Created YarnClusterScheduler")
+
+  override def postStartHook() {
+    ApplicationMaster.sparkContextInitialized(sc)
+    super.postStartHook()
+    logInfo("YarnClusterScheduler.postStartHook done")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
new file mode 100644
index 0000000..4f3d5eb
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+
+private[spark] class YarnClusterSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends YarnSchedulerBackend(scheduler, sc) {
+
+  override def start() {
+    val attemptId = ApplicationMaster.getAttemptId
+    bindToYarn(attemptId.getApplicationId(), Some(attemptId))
+    super.start()
+    totalExpectedExecutors = 
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
+  }
+
+  override def getDriverLogUrls: Option[Map[String, String]] = {
+    var driverLogs: Option[Map[String, String]] = None
+    try {
+      val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
+      val containerId = YarnSparkHadoopUtil.get.getContainerId
+
+      val httpAddress = System.getenv(Environment.NM_HOST.name()) +
+        ":" + System.getenv(Environment.NM_HTTP_PORT.name())
+      // lookup appropriate http scheme for container log urls
+      val yarnHttpPolicy = yarnConf.get(
+        YarnConfiguration.YARN_HTTP_POLICY_KEY,
+        YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
+      )
+      val user = Utils.getCurrentUserName()
+      val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://"; else 
"http://";
+      val baseUrl = 
s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
+      logDebug(s"Base URL for logs: $baseUrl")
+      driverLogs = Some(Map(
+        "stdout" -> s"$baseUrl/stdout?start=-4096",
+        "stderr" -> s"$baseUrl/stderr?start=-4096"))
+    } catch {
+      case e: Exception =>
+        logInfo("Error while building AM log links, so AM" +
+          " logs link will not appear in application UI", e)
+    }
+    driverLogs
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
new file mode 100644
index 0000000..0293821
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.util.RackResolver
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+
+private[spark] class YarnScheduler(sc: SparkContext) extends 
TaskSchedulerImpl(sc) {
+
+  // RackResolver logs an INFO message whenever it resolves a rack, which is 
way too often.
+  if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
+    Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
+  }
+
+  // By default, rack is unknown
+  override def getRackForHost(hostPort: String): Option[String] = {
+    val host = Utils.parseHostPort(hostPort)._1
+    Option(RackResolver.resolve(sc.hadoopConfiguration, 
host).getNetworkLocation)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
new file mode 100644
index 0000000..2f9ea19
--- /dev/null
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.util.{RpcUtils, ThreadUtils}
+
+/**
+ * Abstract Yarn scheduler backend that contains common logic
+ * between the client and cluster Yarn scheduler backends.
+ */
+private[spark] abstract class YarnSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
+
+  override val minRegisteredRatio =
+    if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) 
{
+      0.8
+    } else {
+      super.minRegisteredRatio
+    }
+
+  protected var totalExpectedExecutors = 0
+
+  private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+
+  private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
+    YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
+
+  private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
+
+  /** Application ID. */
+  protected var appId: Option[ApplicationId] = None
+
+  /** Attempt ID. This is unset for client-mode schedulers */
+  private var attemptId: Option[ApplicationAttemptId] = None
+
+  /** Scheduler extension services. */
+  private val services: SchedulerExtensionServices = new 
SchedulerExtensionServices()
+
+  // Flag to specify whether this schedulerBackend should be reset.
+  private var shouldResetOnAmRegister = false
+
+  /**
+   * Bind to YARN. This *must* be done before calling [[start()]].
+   *
+   * @param appId YARN application ID
+   * @param attemptId Optional YARN attempt ID
+   */
+  protected def bindToYarn(appId: ApplicationId, attemptId: 
Option[ApplicationAttemptId]): Unit = {
+    this.appId = Some(appId)
+    this.attemptId = attemptId
+  }
+
+  override def start() {
+    require(appId.isDefined, "application ID unset")
+    val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
+    services.start(binding)
+    super.start()
+  }
+
+  override def stop(): Unit = {
+    try {
+      // SPARK-12009: To prevent Yarn allocator from requesting backup for the 
executors which
+      // was Stopped by SchedulerBackend.
+      requestTotalExecutors(0, 0, Map.empty)
+      super.stop()
+    } finally {
+      services.stop()
+    }
+  }
+
+  /**
+   * Get the attempt ID for this run, if the cluster manager supports multiple
+   * attempts. Applications run in client mode will not have attempt IDs.
+   * This attempt ID only includes attempt counter, like "1", "2".
+   *
+   * @return The application attempt id, if available.
+   */
+  override def applicationAttemptId(): Option[String] = {
+    attemptId.map(_.getAttemptId.toString)
+  }
+
+  /**
+   * Get an application ID associated with the job.
+   * This returns the string value of [[appId]] if set, otherwise
+   * the locally-generated ID from the superclass.
+   * @return The application ID
+   */
+  override def applicationId(): String = {
+    appId.map(_.toString).getOrElse {
+      logWarning("Application ID is not initialized yet.")
+      super.applicationId
+    }
+  }
+
+  /**
+   * Request executors from the ApplicationMaster by specifying the total 
number desired.
+   * This includes executors already pending or running.
+   */
+  override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = 
{
+    yarnSchedulerEndpointRef.ask[Boolean](
+      RequestExecutors(requestedTotal, localityAwareTasks, 
hostToLocalTaskCount))
+  }
+
+  /**
+   * Request that the ApplicationMaster kill the specified executors.
+   */
+  override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+    yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
+  }
+
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= totalExpectedExecutors * 
minRegisteredRatio
+  }
+
+  /**
+   * Add filters to the SparkUI.
+   */
+  private def addWebUIFilter(
+      filterName: String,
+      filterParams: Map[String, String],
+      proxyBase: String): Unit = {
+    if (proxyBase != null && proxyBase.nonEmpty) {
+      System.setProperty("spark.ui.proxyBase", proxyBase)
+    }
+
+    val hasFilter =
+      filterName != null && filterName.nonEmpty &&
+      filterParams != null && filterParams.nonEmpty
+    if (hasFilter) {
+      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+      conf.set("spark.ui.filters", filterName)
+      filterParams.foreach { case (k, v) => 
conf.set(s"spark.$filterName.param.$k", v) }
+      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, 
conf) }
+    }
+  }
+
+  override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
+    new YarnDriverEndpoint(rpcEnv, properties)
+  }
+
+  /**
+   * Reset the state of SchedulerBackend to the initial state. This is 
happened when AM is failed
+   * and re-registered itself to driver after a failure. The stale state in 
driver should be
+   * cleaned.
+   */
+  override protected def reset(): Unit = {
+    super.reset()
+    sc.executorAllocationManager.foreach(_.reset())
+  }
+
+  /**
+   * Override the DriverEndpoint to add extra logic for the case when an 
executor is disconnected.
+   * This endpoint communicates with the executors and queries the AM for an 
executor's exit
+   * status when the executor is disconnected.
+   */
+  private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
+      extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+    /**
+     * When onDisconnected is received at the driver endpoint, the superclass 
DriverEndpoint
+     * handles it by assuming the Executor was lost for a bad reason and 
removes the executor
+     * immediately.
+     *
+     * In YARN's case however it is crucial to talk to the application master 
and ask why the
+     * executor had exited. If the executor exited for some reason unrelated 
to the running tasks
+     * (e.g., preemption), according to the application master, then we pass 
that information down
+     * to the TaskSetManager to inform the TaskSetManager that tasks on that 
lost executor should
+     * not count towards a job failure.
+     */
+    override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+      addressToExecutorId.get(rpcAddress).foreach { executorId =>
+        if (disableExecutor(executorId)) {
+          
yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, 
rpcAddress)
+        }
+      }
+    }
+  }
+
+  /**
+   * An [[RpcEndpoint]] that communicates with the ApplicationMaster.
+   */
+  private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
+    extends ThreadSafeRpcEndpoint with Logging {
+    private var amEndpoint: Option[RpcEndpointRef] = None
+
+    private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
+        executorId: String,
+        executorRpcAddress: RpcAddress): Unit = {
+      val removeExecutorMessage = amEndpoint match {
+        case Some(am) =>
+          val lossReasonRequest = GetExecutorLossReason(executorId)
+          am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+            .map { reason => RemoveExecutor(executorId, reason) 
}(ThreadUtils.sameThread)
+            .recover {
+              case NonFatal(e) =>
+                logWarning(s"Attempted to get executor loss reason" +
+                  s" for executor id ${executorId} at RPC address 
${executorRpcAddress}," +
+                  s" but got no response. Marking as slave lost.", e)
+                RemoveExecutor(executorId, SlaveLost())
+            }(ThreadUtils.sameThread)
+        case None =>
+          logWarning("Attempted to check for an executor loss reason" +
+            " before the AM has registered!")
+          Future.successful(RemoveExecutor(executorId, SlaveLost("AM is not 
yet registered.")))
+      }
+
+      removeExecutorMessage
+        .flatMap { message =>
+          driverEndpoint.ask[Boolean](message)
+        }(ThreadUtils.sameThread)
+        .onFailure {
+          case NonFatal(e) => logError(
+            s"Error requesting driver to remove executor $executorId after 
disconnection.", e)
+        }(ThreadUtils.sameThread)
+    }
+
+    override def receive: PartialFunction[Any, Unit] = {
+      case RegisterClusterManager(am) =>
+        logInfo(s"ApplicationMaster registered as $am")
+        amEndpoint = Option(am)
+        if (!shouldResetOnAmRegister) {
+          shouldResetOnAmRegister = true
+        } else {
+          // AM is already registered before, this potentially means that AM 
failed and
+          // a new one registered after the failure. This will only happen in 
yarn-client mode.
+          reset()
+        }
+
+      case AddWebUIFilter(filterName, filterParams, proxyBase) =>
+        addWebUIFilter(filterName, filterParams, proxyBase)
+
+      case r @ RemoveExecutor(executorId, reason) =>
+        logWarning(reason.toString)
+        driverEndpoint.ask[Boolean](r).onFailure {
+          case e =>
+            logError("Error requesting driver to remove executor" +
+              s" $executorId for reason $reason", e)
+        }(ThreadUtils.sameThread)
+    }
+
+
+    override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+      case r: RequestExecutors =>
+        amEndpoint match {
+          case Some(am) =>
+            am.ask[Boolean](r).andThen {
+              case Success(b) => context.reply(b)
+              case Failure(NonFatal(e)) =>
+                logError(s"Sending $r to AM was unsuccessful", e)
+                context.sendFailure(e)
+            }(ThreadUtils.sameThread)
+          case None =>
+            logWarning("Attempted to request executors before the AM has 
registered!")
+            context.reply(false)
+        }
+
+      case k: KillExecutors =>
+        amEndpoint match {
+          case Some(am) =>
+            am.ask[Boolean](k).andThen {
+              case Success(b) => context.reply(b)
+              case Failure(NonFatal(e)) =>
+                logError(s"Sending $k to AM was unsuccessful", e)
+                context.sendFailure(e)
+            }(ThreadUtils.sameThread)
+          case None =>
+            logWarning("Attempted to kill executors before the AM has 
registered!")
+            context.reply(false)
+        }
+
+      case RetrieveLastAllocatedExecutorId =>
+        context.reply(currentExecutorIdCounter)
+    }
+
+    override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+      if (amEndpoint.exists(_.address == remoteAddress)) {
+        logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
+        amEndpoint = None
+      }
+    }
+  }
+}
+
+private[spark] object YarnSchedulerBackend {
+  val ENDPOINT_NAME = "YarnScheduler"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
 
b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
new file mode 100644
index 0000000..d0ef5ef
--- /dev/null
+++ 
b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -0,0 +1 @@
+org.apache.spark.deploy.yarn.security.TestCredentialProvider

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/resources/log4j.properties 
b/resource-managers/yarn/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d13454d
--- /dev/null
+++ b/resource-managers/yarn/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=DEBUG, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
+
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark_project.jetty=WARN


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to