This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 669e8a1  [SPARK-25689][YARN] Make driver, not AM, manage delegation 
tokens.
669e8a1 is described below

commit 669e8a155987995a1a5d49a96b88c05f39e41723
Author: Marcelo Vanzin <van...@cloudera.com>
AuthorDate: Mon Jan 7 14:40:08 2019 -0600

    [SPARK-25689][YARN] Make driver, not AM, manage delegation tokens.
    
    This change modifies the behavior of the delegation token code when running
    on YARN, so that the driver controls the renewal, in both client and cluster
    mode. For that, a few different things were changed:
    
    * The AM code only runs code that needs DTs when DTs are available.
    
    In a way, this restores the AM behavior to what it was pre-SPARK-23361, but
    keeping the fix added in that bug. Basically, all the AM code is run in a
    "UGI.doAs()" block; but code that needs to talk to HDFS (basically the
    distributed cache handling code) was delayed to the point where the driver
    is up and running, and thus when valid delegation tokens are available.
    
    * SparkSubmit / ApplicationMaster now handle user login, not the token 
manager.
    
    The previous AM code was relying on the token manager to keep the user
    logged in when keytabs are used. This required some odd APIs in the token
    manager and the AM so that the right UGI was exposed and used in the right
    places.
    
    After this change, the logged in user is handled separately from the token
    manager, so the API was cleaned up, and, as explained above, the whole AM
    runs under the logged in user, which also helps with simplifying some more 
code.
    
    * Distributed cache configs are sent separately to the AM.
    
    Because of the delayed initialization of the cached resources in the AM, it
    became easier to write the cache config to a separate properties file 
instead
    of bundling it with the rest of the Spark config. This also avoids having
    to modify the SparkConf to hide things from the UI.
    
    * Finally, the AM doesn't manage the token manager anymore.
    
    The above changes allow the token manager to be completely handled by the
    driver's scheduler backend code also in YARN mode (whether client or 
cluster),
    making it similar to other RMs. To maintain the fix added in SPARK-23361 
also
    in client mode, the AM now sends an extra message to the driver on 
initialization
    to fetch delegation tokens; and although it might not really be needed, the
    driver also keeps the running AM updated when new tokens are created.
    
    Tested in a kerberized cluster with the same tests used to validate 
SPARK-23361,
    in both client and cluster mode. Also tested with a non-kerberized cluster.
    
    Closes #23338 from vanzin/SPARK-25689.
    
    Authored-by: Marcelo Vanzin <van...@cloudera.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../security/HadoopDelegationTokenManager.scala    | 110 ++++++-----------
 .../security/HiveDelegationTokenProvider.scala     |  16 ++-
 .../cluster/CoarseGrainedClusterMessage.scala      |   3 +
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  40 ++++--
 .../HadoopDelegationTokenManagerSuite.scala        |   8 +-
 .../features/KerberosConfDriverFeatureStep.scala   |   2 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala    |   7 +-
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala |   7 +-
 .../spark/deploy/yarn/ApplicationMaster.scala      | 135 ++++++++++-----------
 .../deploy/yarn/ApplicationMasterArguments.scala   |   5 +
 .../org/apache/spark/deploy/yarn/Client.scala      | 100 ++++++++-------
 .../apache/spark/deploy/yarn/YarnRMClient.scala    |   8 +-
 .../org/apache/spark/deploy/yarn/config.scala      |  10 --
 .../YARNHadoopDelegationTokenManager.scala         |   7 +-
 .../cluster/YarnClientSchedulerBackend.scala       |   6 +
 .../scheduler/cluster/YarnSchedulerBackend.scala   |  17 ++-
 .../YARNHadoopDelegationTokenManagerSuite.scala    |   2 +-
 17 files changed, 246 insertions(+), 237 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index f7e3dde..d97857a 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.net.URI
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
-import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
@@ -39,32 +38,24 @@ import org.apache.spark.util.ThreadUtils
 /**
  * Manager for delegation tokens in a Spark application.
  *
- * This manager has two modes of operation:
- *
- * 1.  When configured with a principal and a keytab, it will make sure 
long-running apps can run
- * without interruption while accessing secured services. It periodically logs 
in to the KDC with
- * user-provided credentials, and contacts all the configured secure services 
to obtain delegation
- * tokens to be distributed to the rest of the application.
- *
- * Because the Hadoop UGI API does not expose the TTL of the TGT, a 
configuration controls how often
- * to check that a relogin is necessary. This is done reasonably often since 
the check is a no-op
- * when the relogin is not yet needed. The check period can be overridden in 
the configuration.
+ * When configured with a principal and a keytab, this manager will make sure 
long-running apps can
+ * run without interruption while accessing secured services. It periodically 
logs in to the KDC
+ * with user-provided credentials, and contacts all the configured secure 
services to obtain
+ * delegation tokens to be distributed to the rest of the application.
  *
  * New delegation tokens are created once 75% of the renewal interval of the 
original tokens has
- * elapsed. The new tokens are sent to the Spark driver endpoint once it's 
registered with the AM.
- * The driver is tasked with distributing the tokens to other processes that 
might need them.
+ * elapsed. The new tokens are sent to the Spark driver endpoint. The driver 
is tasked with
+ * distributing the tokens to other processes that might need them.
  *
- * 2. When operating without an explicit principal and keytab, token renewal 
will not be available.
- * Starting the manager will distribute an initial set of delegation tokens to 
the provided Spark
- * driver, but the app will not get new tokens when those expire.
- *
- * It can also be used just to create delegation tokens, by calling the 
`obtainDelegationTokens`
- * method. This option does not require calling the `start` method, but leaves 
it up to the
- * caller to distribute the tokens that were generated.
+ * This class can also be used just to create delegation tokens, by calling the
+ * `obtainDelegationTokens` method. This option does not require calling the 
`start` method nor
+ * providing a driver reference, but leaves it up to the caller to distribute 
the tokens that were
+ * generated.
  */
 private[spark] class HadoopDelegationTokenManager(
     protected val sparkConf: SparkConf,
-    protected val hadoopConf: Configuration) extends Logging {
+    protected val hadoopConf: Configuration,
+    protected val schedulerRef: RpcEndpointRef) extends Logging {
 
   private val deprecatedProviderEnabledConfigs = List(
     "spark.yarn.security.tokens.%s.enabled",
@@ -85,60 +76,44 @@ private[spark] class HadoopDelegationTokenManager(
     s"${delegationTokenProviders.keys.mkString(", ")}.")
 
   private var renewalExecutor: ScheduledExecutorService = _
-  private val driverRef = new AtomicReference[RpcEndpointRef]()
-
-  /** Set the endpoint used to send tokens to the driver. */
-  def setDriverRef(ref: RpcEndpointRef): Unit = {
-    driverRef.set(ref)
-  }
 
   /** @return Whether delegation token renewal is enabled. */
   def renewalEnabled: Boolean = principal != null
 
   /**
-   * Start the token renewer. Requires a principal and keytab. Upon start, the 
renewer will:
+   * Start the token renewer. Requires a principal and keytab. Upon start, the 
renewer will
+   * obtain delegation tokens for all configured services and send them to the 
driver, and
+   * set up tasks to periodically get fresh tokens as needed.
    *
-   * - log in the configured principal, and set up a task to keep that user's 
ticket renewed
-   * - obtain delegation tokens from all available providers
-   * - send the tokens to the driver, if it's already registered
-   * - schedule a periodic task to update the tokens when needed.
+   * This method requires that a keytab has been provided to Spark, and will 
try to keep the
+   * logged in user's TGT valid while this manager is active.
    *
-   * @return The newly logged in user.
+   * @return New set of delegation tokens created for the configured principal.
    */
-  def start(): UserGroupInformation = {
+  def start(): Array[Byte] = {
     require(renewalEnabled, "Token renewal must be enabled to start the 
renewer.")
+    require(schedulerRef != null, "Token renewal requires a scheduler 
endpoint.")
     renewalExecutor =
       ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal 
Thread")
 
-    val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
-    val ugi = doLogin()
-
-    val tgtRenewalTask = new Runnable() {
-      override def run(): Unit = {
-        ugi.checkTGTAndReloginFromKeytab()
+    val ugi = UserGroupInformation.getCurrentUser()
+    if (ugi.isFromKeytab()) {
+      // In Hadoop 2.x, renewal of the keytab-based login seems to be 
automatic, but in Hadoop 3.x,
+      // it is configurable (see 
hadoop.kerberos.keytab.login.autorenewal.enabled, added in
+      // HADOOP-9567). This task will make sure that the user stays logged in 
regardless of that
+      // configuration's value. Note that checkTGTAndReloginFromKeytab() is a 
no-op if the TGT does
+      // not need to be renewed yet.
+      val tgtRenewalTask = new Runnable() {
+        override def run(): Unit = {
+          ugi.checkTGTAndReloginFromKeytab()
+        }
       }
+      val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
+      renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, 
tgtRenewalPeriod,
+        TimeUnit.SECONDS)
     }
-    val tgtRenewalPeriod = sparkConf.get(KERBEROS_RELOGIN_PERIOD)
-    renewalExecutor.scheduleAtFixedRate(tgtRenewalTask, tgtRenewalPeriod, 
tgtRenewalPeriod,
-      TimeUnit.SECONDS)
 
-    val creds = obtainTokensAndScheduleRenewal(ugi)
-    ugi.addCredentials(creds)
-
-    val driver = driverRef.get()
-    if (driver != null) {
-      val tokens = SparkHadoopUtil.get.serialize(creds)
-      driver.send(UpdateDelegationTokens(tokens))
-    }
-
-    // Transfer the original user's tokens to the new user, since it may 
contain needed tokens
-    // (such as those user to connect to YARN). Explicitly avoid overwriting 
tokens that already
-    // exist in the current user's credentials, since those were freshly 
obtained above
-    // (see SPARK-23361).
-    val existing = ugi.getCredentials()
-    existing.mergeAll(originalCreds)
-    ugi.addCredentials(existing)
-    ugi
+    updateTokensTask()
   }
 
   def stop(): Unit = {
@@ -218,27 +193,22 @@ private[spark] class HadoopDelegationTokenManager(
    * Periodic task to login to the KDC and create new delegation tokens. 
Re-schedules itself
    * to fetch the next set of tokens when needed.
    */
-  private def updateTokensTask(): Unit = {
+  private def updateTokensTask(): Array[Byte] = {
     try {
       val freshUGI = doLogin()
       val creds = obtainTokensAndScheduleRenewal(freshUGI)
       val tokens = SparkHadoopUtil.get.serialize(creds)
 
-      val driver = driverRef.get()
-      if (driver != null) {
-        logInfo("Updating delegation tokens.")
-        driver.send(UpdateDelegationTokens(tokens))
-      } else {
-        // This shouldn't really happen, since the driver should register way 
before tokens expire.
-        logWarning("Delegation tokens close to expiration but no driver has 
registered yet.")
-        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
-      }
+      logInfo("Updating delegation tokens.")
+      schedulerRef.send(UpdateDelegationTokens(tokens))
+      tokens
     } catch {
       case e: Exception =>
         val delay = 
TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
         logWarning(s"Failed to update tokens, will try again in 
${UIUtils.formatDuration(delay)}!" +
           " If this happens too often tasks will fail.", e)
         scheduleRenewal(delay)
+        null
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
index 90f7051..4ca0136 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala
@@ -67,11 +67,17 @@ private[spark] class HiveDelegationTokenProvider
     // Other modes (such as client with or without keytab, or cluster mode 
with keytab) do not need
     // a delegation token, since there's a valid kerberos TGT for the right 
user available to the
     // driver, which is the only process that connects to the HMS.
-    val deployMode = sparkConf.get("spark.submit.deployMode", "client")
-    UserGroupInformation.isSecurityEnabled &&
+    //
+    // Note that this means Hive tokens are not re-created periodically by the 
token manager.
+    // This is because HMS connections are only performed by the Spark driver, 
and the driver
+    // either has a TGT, in which case it does not need tokens, or it has a 
token created
+    // elsewhere, in which case it cannot create new ones. The check for an 
existing token avoids
+    // printing an exception to the logs in the latter case.
+    val currentToken = 
UserGroupInformation.getCurrentUser().getCredentials().getToken(tokenAlias)
+    currentToken == null && UserGroupInformation.isSecurityEnabled &&
       hiveConf(hadoopConf).getTrimmed("hive.metastore.uris", "").nonEmpty &&
       (SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser()) 
||
-        (deployMode == "cluster" && !sparkConf.contains(KEYTAB)))
+        (!Utils.isClientMode(sparkConf) && !sparkConf.contains(KEYTAB)))
   }
 
   override def obtainDelegationTokens(
@@ -98,7 +104,7 @@ private[spark] class HiveDelegationTokenProvider
         val hive2Token = new Token[DelegationTokenIdentifier]()
         hive2Token.decodeFromUrlString(tokenStr)
         logDebug(s"Get Token from hive metastore: ${hive2Token.toString}")
-        creds.addToken(new Text("hive.server2.delegation.token"), hive2Token)
+        creds.addToken(tokenAlias, hive2Token)
       }
 
       None
@@ -134,4 +140,6 @@ private[spark] class HiveDelegationTokenProvider
       case e: UndeclaredThrowableException => throw 
Option(e.getCause()).getOrElse(e)
     }
   }
+
+  private def tokenAlias: Text = new Text("hive.server2.delegation.token")
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index e8b7fc0..9e768c2 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -104,6 +104,9 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class RegisterClusterManager(am: RpcEndpointRef) extends 
CoarseGrainedClusterMessage
 
+  // Used by YARN's client mode AM to retrieve the current set of delegation 
tokens.
+  object RetrieveDelegationTokens extends CoarseGrainedClusterMessage
+
   // Request executors by specifying the new total number of executors desired
   // This includes executors already pending or running
   case class RequestExecutors(
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 329158a..98ed2ff 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -162,11 +162,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         }
 
       case UpdateDelegationTokens(newDelegationTokens) =>
-        SparkHadoopUtil.get.addDelegationTokens(newDelegationTokens, conf)
-        delegationTokens.set(newDelegationTokens)
-        executorDataMap.values.foreach { ed =>
-          ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
-        }
+        updateDelegationTokens(newDelegationTokens)
 
       case RemoveExecutor(executorId, reason) =>
         // We will remove the executor's state and cannot restore it. However, 
the connection
@@ -404,17 +400,18 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     driverEndpoint = createDriverEndpointRef(properties)
 
     if (UserGroupInformation.isSecurityEnabled()) {
-      delegationTokenManager = createTokenManager()
+      delegationTokenManager = createTokenManager(driverEndpoint)
       delegationTokenManager.foreach { dtm =>
-        dtm.setDriverRef(driverEndpoint)
-        val creds = if (dtm.renewalEnabled) {
-          dtm.start().getCredentials()
+        val tokens = if (dtm.renewalEnabled) {
+          dtm.start()
         } else {
           val creds = UserGroupInformation.getCurrentUser().getCredentials()
           dtm.obtainDelegationTokens(creds)
-          creds
+          SparkHadoopUtil.get.serialize(creds)
+        }
+        if (tokens != null) {
+          delegationTokens.set(tokens)
         }
-        delegationTokens.set(SparkHadoopUtil.get.serialize(creds))
       }
     }
   }
@@ -716,8 +713,27 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    * Create the delegation token manager to be used for the application. This 
method is called
    * once during the start of the scheduler backend (so after the object has 
already been
    * fully constructed), only if security is enabled in the Hadoop 
configuration.
+   *
+   * @param schedulerRef RPC endpoint for the scheduler, where updated 
delegation tokens should be
+   *                     sent.
    */
-  protected def createTokenManager(): Option[HadoopDelegationTokenManager] = 
None
+  protected def createTokenManager(
+      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = 
None
+
+  /**
+   * Called when a new set of delegation tokens is sent to the driver. Child 
classes can override
+   * this method but should always call this implementation, which handles 
token distribution to
+   * executors.
+   */
+  protected def updateDelegationTokens(tokens: Array[Byte]): Unit = {
+    SparkHadoopUtil.get.addDelegationTokens(tokens, conf)
+    delegationTokens.set(tokens)
+    executorDataMap.values.foreach { ed =>
+      ed.executorEndpoint.send(UpdateDelegationTokens(tokens))
+    }
+  }
+
+  protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
 
 }
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index def9e62..af7d44b 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -27,7 +27,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
{
   private val hadoopConf = new Configuration()
 
   test("default configuration") {
-    val manager = new HadoopDelegationTokenManager(new SparkConf(false), 
hadoopConf)
+    val manager = new HadoopDelegationTokenManager(new SparkConf(false), 
hadoopConf, null)
     assert(manager.isProviderLoaded("hadoopfs"))
     assert(manager.isProviderLoaded("hbase"))
     assert(manager.isProviderLoaded("hive"))
@@ -36,7 +36,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
{
 
   test("disable hive credential provider") {
     val sparkConf = new 
SparkConf(false).set("spark.security.credentials.hive.enabled", "false")
-    val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
+    val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
     assert(manager.isProviderLoaded("hadoopfs"))
     assert(manager.isProviderLoaded("hbase"))
     assert(!manager.isProviderLoaded("hive"))
@@ -47,7 +47,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite 
{
     val sparkConf = new SparkConf(false)
       .set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
       .set("spark.yarn.security.credentials.hive.enabled", "false")
-    val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf)
+    val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
     assert(!manager.isProviderLoaded("hadoopfs"))
     assert(manager.isProviderLoaded("hbase"))
     assert(!manager.isProviderLoaded("hive"))
@@ -99,7 +99,7 @@ private object NoHiveTest {
 
   def runTest(): Unit = {
     try {
-      val manager = new HadoopDelegationTokenManager(new SparkConf(), new 
Configuration())
+      val manager = new HadoopDelegationTokenManager(new SparkConf(), new 
Configuration(), null)
       require(!manager.isProviderLoaded("hive"))
     } catch {
       case e: Throwable =>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
index 721d7e9..a77e8d4 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
@@ -91,7 +91,7 @@ private[spark] class 
KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
   private lazy val delegationTokens: Array[Byte] = {
     if (keytab.isEmpty && existingSecretName.isEmpty) {
       val tokenManager = new 
HadoopDelegationTokenManager(kubernetesConf.sparkConf,
-        SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf))
+        SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
       val creds = UserGroupInformation.getCurrentUser().getCredentials()
       tokenManager.obtainDelegationTokens(creds)
       // If no tokens and no secrets are stored in the credentials, make sure 
nothing is returned,
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index cd29897..e285e20 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -147,8 +147,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
     new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
   }
 
-  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
-    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
+  override protected def createTokenManager(
+      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
+    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, 
schedulerRef))
   }
 
   private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index d017451..03cd258 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -36,7 +36,7 @@ import org.apache.spark.internal.config
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointAddress
+import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef}
 import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -772,8 +772,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     }
   }
 
-  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
-    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
+  override protected def createTokenManager(
+      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
+    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, 
schedulerRef))
   }
 
   private def numExecutors(): Int = {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8dbdac1..1ece7bd 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.{StringUtils => ComStrUtils}
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.util.StringUtils
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
@@ -41,7 +42,6 @@ import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.metrics.MetricsSystem
@@ -58,6 +58,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   // TODO: Currently, task to container is computed once (TaskSetManager) - 
which need not be
   // optimal as more containers are available. Might need to handle this 
better.
 
+  private val appAttemptId = 
YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
   private val isClusterMode = args.userClass != null
 
   private val sparkConf = new SparkConf()
@@ -99,25 +100,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     }
   }
 
-  private val tokenManager: Option[YARNHadoopDelegationTokenManager] = {
-    sparkConf.get(KEYTAB).map { _ =>
-      new YARNHadoopDelegationTokenManager(sparkConf, yarnConf)
-    }
-  }
-
-  private val ugi = tokenManager match {
-    case Some(tm) =>
-      // Set the context class loader so that the token renewer has access to 
jars distributed
-      // by the user.
-      Utils.withContextClassLoader(userClassLoader) {
-        tm.start()
-      }
-
-    case _ =>
-      SparkHadoopUtil.get.createSparkUser()
-  }
-
-  private val client = doAsUser { new YarnRMClient() }
+  private val client = new YarnRMClient()
 
   // Default to twice the number of executors (twice the maximum number of 
executors if dynamic
   // allocation is enabled), with a minimum of 3.
@@ -174,11 +157,19 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   // In cluster mode, used to tell the AM when the user's SparkContext has 
been initialized.
   private val sparkContextPromise = Promise[SparkContext]()
 
-  // Load the list of localized files set by the client. This is used when 
launching executors,
-  // and is loaded here so that these configs don't pollute the Web UI's 
environment page in
-  // cluster mode.
-  private val localResources = doAsUser {
+  /**
+   * Load the list of localized files set by the client, used when launching 
executors. This should
+   * be called in a context where the needed credentials to access HDFS are 
available.
+   */
+  private def prepareLocalResources(): Map[String, LocalResource] = {
     logInfo("Preparing Local resources")
+    val distCacheConf = new SparkConf(false)
+    if (args.distCacheConf != null) {
+      Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) =>
+        distCacheConf.set(k, v)
+      }
+    }
+
     val resources = HashMap[String, LocalResource]()
 
     def setupDistributedCache(
@@ -199,11 +190,11 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       resources(fileName) = amJarRsrc
     }
 
-    val distFiles = sparkConf.get(CACHED_FILES)
-    val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
-    val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
-    val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
-    val resTypes = sparkConf.get(CACHED_FILES_TYPES)
+    val distFiles = distCacheConf.get(CACHED_FILES)
+    val fileSizes = distCacheConf.get(CACHED_FILES_SIZES)
+    val timeStamps = distCacheConf.get(CACHED_FILES_TIMESTAMPS)
+    val visibilities = distCacheConf.get(CACHED_FILES_VISIBILITIES)
+    val resTypes = distCacheConf.get(CACHED_FILES_TYPES)
 
     for (i <- 0 to distFiles.size - 1) {
       val resType = LocalResourceType.valueOf(resTypes(i))
@@ -212,7 +203,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     }
 
     // Distribute the conf archive to executors.
-    sparkConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
+    distCacheConf.get(CACHED_CONF_ARCHIVE).foreach { path =>
       val uri = new URI(path)
       val fs = FileSystem.get(uri, yarnConf)
       val status = fs.getFileStatus(new Path(uri))
@@ -225,33 +216,12 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         LocalResourceVisibility.PRIVATE.name())
     }
 
-    // Clean up the configuration so it doesn't show up in the Web UI (since 
it's really noisy).
-    CACHE_CONFIGS.foreach { e =>
-      sparkConf.remove(e)
-      sys.props.remove(e.key)
-    }
-
     resources.toMap
   }
 
-  def getAttemptId(): ApplicationAttemptId = {
-    client.getAttemptId()
-  }
-
   final def run(): Int = {
-    doAsUser {
-      runImpl()
-    }
-    exitCode
-  }
-
-  private def runImpl(): Unit = {
     try {
-      val appAttemptId = client.getAttemptId()
-
-      var attemptID: Option[String] = None
-
-      if (isClusterMode) {
+      val attemptID = if (isClusterMode) {
         // Set the web ui port to be ephemeral for yarn so we don't conflict 
with
         // other spark processes running on the same box
         System.setProperty("spark.ui.port", "0")
@@ -264,7 +234,9 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         // configuration will be checked in SparkContext to avoid misuse of 
yarn cluster mode.
         System.setProperty("spark.yarn.app.id", 
appAttemptId.getApplicationId().toString())
 
-        attemptID = Option(appAttemptId.getAttemptId.toString)
+        Option(appAttemptId.getAttemptId.toString)
+      } else {
+        None
       }
 
       new CallerContext(
@@ -277,7 +249,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
       ShutdownHookManager.addShutdownHook(priority) { () =>
         val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
-        val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
+        val isLastAttempt = appAttemptId.getAttemptId() >= maxAppAttempts
 
         if (!finished) {
           // The default state of ApplicationMaster is failed if it is invoked 
by shut down hook.
@@ -322,6 +294,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
           logWarning("Exception during stopping of the metric system: ", e)
       }
     }
+
+    exitCode
   }
 
   /**
@@ -377,9 +351,6 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
           logDebug("shutting down user thread")
           userClassThread.interrupt()
         }
-        if (!inShutdown) {
-          tokenManager.foreach(_.stop())
-        }
       }
     }
   }
@@ -405,8 +376,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       port: Int,
       _sparkConf: SparkConf,
       uiAddress: Option[String]): Unit = {
-    val appId = client.getAttemptId().getApplicationId().toString()
-    val attemptId = client.getAttemptId().getAttemptId().toString()
+    val appId = appAttemptId.getApplicationId().toString()
+    val attemptId = appAttemptId.getAttemptId().toString()
     val historyAddress = ApplicationMaster
       .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)
 
@@ -415,9 +386,20 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
   }
 
   private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: 
SparkConf): Unit = {
-    val appId = client.getAttemptId().getApplicationId().toString()
+    // In client mode, the AM may be restarting after delegation tokens have 
reached their TTL. So
+    // always contact the driver to get the current set of valid tokens, so 
that local resources can
+    // be initialized below.
+    if (!isClusterMode) {
+      val tokens = driverRef.askSync[Array[Byte]](RetrieveDelegationTokens)
+      if (tokens != null) {
+        SparkHadoopUtil.get.addDelegationTokens(tokens, _sparkConf)
+      }
+    }
+
+    val appId = appAttemptId.getApplicationId().toString()
     val driverUrl = RpcEndpointAddress(driverRef.address.host, 
driverRef.address.port,
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+    val localResources = prepareLocalResources()
 
     // Before we initialize the allocator, let's log the information about how 
executors will
     // be run up front, to avoid printing this out for every single executor 
being launched.
@@ -433,13 +415,12 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     allocator = client.createAllocator(
       yarnConf,
       _sparkConf,
+      appAttemptId,
       driverUrl,
       driverRef,
       securityMgr,
       localResources)
 
-    tokenManager.foreach(_.setDriverRef(driverRef))
-
     // Initialize the AM endpoint *after* the allocator has been initialized. 
This ensures
     // that when the driver sends an initial executor request (e.g. after an 
AM restart),
     // the allocator is ready to service requests.
@@ -755,6 +736,9 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
           case None =>
             logWarning("Container allocator is not ready to find executor loss 
reasons yet.")
         }
+
+      case UpdateDelegationTokens(tokens) =>
+        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -767,12 +751,6 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
     }
   }
 
-  private def doAsUser[T](fn: => T): T = {
-    ugi.doAs(new PrivilegedExceptionAction[T]() {
-      override def run: T = fn
-    })
-  }
-
 }
 
 object ApplicationMaster extends Logging {
@@ -793,7 +771,24 @@ object ApplicationMaster extends Logging {
     SignalUtils.registerLogger(log)
     val amArgs = new ApplicationMasterArguments(args)
     master = new ApplicationMaster(amArgs)
-    System.exit(master.run())
+
+    val ugi = master.sparkConf.get(PRINCIPAL) match {
+      case Some(principal) =>
+        val originalCreds = 
UserGroupInformation.getCurrentUser().getCredentials()
+        SparkHadoopUtil.get.loginUserFromKeytab(principal, 
master.sparkConf.get(KEYTAB).orNull)
+        val newUGI = UserGroupInformation.getCurrentUser()
+        // Transfer the original user's tokens to the new user, since it may 
contain needed tokens
+        // (such as those user to connect to YARN).
+        newUGI.addCredentials(originalCreds)
+        newUGI
+
+      case _ =>
+        SparkHadoopUtil.get.createSparkUser()
+    }
+
+    ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+      override def run(): Unit = System.exit(master.run())
+    })
   }
 
   private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
@@ -801,7 +796,7 @@ object ApplicationMaster extends Logging {
   }
 
   private[spark] def getAttemptId(): ApplicationAttemptId = {
-    master.getAttemptId
+    master.appAttemptId
   }
 
   private[spark] def getHistoryServerAddress(
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index cc76a7c8f..c10206c 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,6 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var primaryRFile: String = null
   var userArgs: Seq[String] = Nil
   var propertiesFile: String = null
+  var distCacheConf: String = null
 
   parseArgs(args.toList)
 
@@ -62,6 +63,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
           propertiesFile = value
           args = tail
 
+        case ("--dist-cache-conf") :: value :: tail =>
+          distCacheConf = value
+          args = tail
+
         case _ =>
           printUsageAndExit(1, args)
       }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 44a60b8..9f09dc0 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -100,21 +100,19 @@ private[spark] class Client(
   }
 
   private val distCacheMgr = new ClientDistributedCacheManager()
+  private val cachedResourcesConf = new SparkConf(false)
 
-  private val principal = sparkConf.get(PRINCIPAL).orNull
   private val keytab = sparkConf.get(KEYTAB).orNull
-  private val loginFromKeytab = principal != null
-  private val amKeytabFileName: String = {
+  private val amKeytabFileName: Option[String] = if (keytab != null && 
isClusterMode) {
+    val principal = sparkConf.get(PRINCIPAL).orNull
     require((principal == null) == (keytab == null),
       "Both principal and keytab must be defined, or neither.")
-    if (loginFromKeytab) {
-      logInfo(s"Kerberos credentials: principal = $principal, keytab = 
$keytab")
-      // Generate a file name that can be used for the keytab file, that does 
not conflict
-      // with any user file.
-      new File(keytab).getName() + "-" + UUID.randomUUID().toString
-    } else {
-      null
-    }
+    logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab")
+    // Generate a file name that can be used for the keytab file, that does 
not conflict
+    // with any user file.
+    Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString)
+  } else {
+    None
   }
 
   require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should 
reference a local file.")
@@ -220,16 +218,7 @@ private[spark] class Client(
       }
     }
 
-    if (isClusterMode && principal != null && keytab != null) {
-      val newUgi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
-      newUgi.doAs(new PrivilegedExceptionAction[Unit] {
-        override def run(): Unit = {
-          cleanupStagingDirInternal()
-        }
-      })
-    } else {
-      cleanupStagingDirInternal()
-    }
+    cleanupStagingDirInternal()
   }
 
   /**
@@ -312,7 +301,7 @@ private[spark] class Client(
    */
   private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
     val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-    val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf)
+    val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf, null)
     credentialManager.obtainDelegationTokens(credentials)
 
     // When using a proxy user, copy the delegation tokens to the user's 
credentials. Avoid
@@ -496,11 +485,11 @@ private[spark] class Client(
 
     // If we passed in a keytab, make sure we copy the keytab to the staging 
directory on
     // HDFS, and setup the relevant environment vars, so the AM can login 
again.
-    if (loginFromKeytab) {
+    amKeytabFileName.foreach { kt =>
       logInfo("To enable the AM to login from keytab, credentials are being 
copied over to the AM" +
         " via the YARN Secure Distributed Cache.")
       val (_, localizedPath) = distribute(keytab,
-        destName = Some(amKeytabFileName),
+        destName = Some(kt),
         appMasterOnly = true)
       require(localizedPath != null, "Keytab file already distributed.")
     }
@@ -636,7 +625,7 @@ private[spark] class Client(
     // Update the configuration with all the distributed files, minus the conf 
archive. The
     // conf archive will be handled by the AM differently so that we avoid 
having to send
     // this configuration by other means. See SPARK-14602 for one reason of 
why this is needed.
-    distCacheMgr.updateConfiguration(sparkConf)
+    distCacheMgr.updateConfiguration(cachedResourcesConf)
 
     // Upload the conf archive to HDFS manually, and record its location in 
the configuration.
     // This will allow the AM to know where the conf archive is in HDFS, so 
that it can be
@@ -648,7 +637,7 @@ private[spark] class Client(
     // system.
     val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
     val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
-    sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
+    cachedResourcesConf.set(CACHED_CONF_ARCHIVE, 
remoteConfArchivePath.toString())
 
     val localConfArchive = new Path(createConfArchive().toURI())
     copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, 
force = true,
@@ -660,11 +649,6 @@ private[spark] class Client(
       remoteFs, hadoopConf, remoteConfArchivePath, localResources, 
LocalResourceType.ARCHIVE,
       LOCALIZED_CONF_DIR, statCache, appMasterOnly = false)
 
-    // Clear the cache-related entries from the configuration to avoid them 
polluting the
-    // UI's environment page. This works for client mode; for cluster mode, 
this is handled
-    // by the AM.
-    CACHE_CONFIGS.foreach(sparkConf.remove)
-
     localResources
   }
 
@@ -768,19 +752,25 @@ private[spark] class Client(
       hadoopConf.writeXml(confStream)
       confStream.closeEntry()
 
-      // Save Spark configuration to a file in the archive, but filter out the 
app's secret.
-      val props = new Properties()
-      sparkConf.getAll.foreach { case (k, v) =>
-        props.setProperty(k, v)
+      // Save Spark configuration to a file in the archive.
+      val props = confToProperties(sparkConf)
+
+      // If propagating the keytab to the AM, override the keytab name with 
the name of the
+      // distributed file. Otherwise remove princpal/keytab from the conf, so 
they're not seen
+      // by the AM at all.
+      amKeytabFileName match {
+        case Some(kt) =>
+          props.setProperty(KEYTAB.key, kt)
+        case None =>
+          props.remove(PRINCIPAL.key)
+          props.remove(KEYTAB.key)
       }
-      // Override spark.yarn.key to point to the location in distributed cache 
which will be used
-      // by AM.
-      Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) 
}
-      confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
-      val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8)
-      props.store(writer, "Spark configuration.")
-      writer.flush()
-      confStream.closeEntry()
+
+      writePropertiesToArchive(props, SPARK_CONF_FILE, confStream)
+
+      // Write the distributed cache config to the archive.
+      writePropertiesToArchive(confToProperties(cachedResourcesConf), 
DIST_CACHE_CONF_FILE,
+        confStream)
     } finally {
       confStream.close()
     }
@@ -984,7 +974,10 @@ private[spark] class Client(
     }
     val amArgs =
       Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ 
userArgs ++
-      Seq("--properties-file", buildPath(Environment.PWD.$$(), 
LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
+      Seq("--properties-file",
+        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) 
++
+      Seq("--dist-cache-conf",
+        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, 
DIST_CACHE_CONF_FILE))
 
     // Command for the ApplicationMaster
     val commands = prefixEnv ++
@@ -1213,6 +1206,9 @@ private object Client extends Logging {
   // Name of the file in the conf archive containing Spark configuration.
   val SPARK_CONF_FILE = "__spark_conf__.properties"
 
+  // Name of the file in the conf archive containing the distributed cache 
info.
+  val DIST_CACHE_CONF_FILE = "__spark_dist_cache__.properties"
+
   // Subdirectory where the user's python files (not archives) will be placed.
   val LOCALIZED_PYTHON_DIR = "__pyfiles__"
 
@@ -1512,6 +1508,22 @@ private object Client extends Logging {
     }
     getClusterPath(conf, cmdPrefix)
   }
+
+  def confToProperties(conf: SparkConf): Properties = {
+    val props = new Properties()
+    conf.getAll.foreach { case (k, v) =>
+      props.setProperty(k, v)
+    }
+    props
+  }
+
+  def writePropertiesToArchive(props: Properties, name: String, out: 
ZipOutputStream): Unit = {
+    out.putNextEntry(new ZipEntry(name))
+    val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)
+    props.store(writer, "Spark configuration.")
+    writer.flush()
+    out.closeEntry()
+  }
 }
 
 private[spark] class YarnClusterApplication extends SparkApplication {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 05a7b1e..cf16edf 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -76,12 +76,13 @@ private[spark] class YarnRMClient extends Logging {
   def createAllocator(
       conf: YarnConfiguration,
       sparkConf: SparkConf,
+      appAttemptId: ApplicationAttemptId,
       driverUrl: String,
       driverRef: RpcEndpointRef,
       securityMgr: SecurityManager,
       localResources: Map[String, LocalResource]): YarnAllocator = {
     require(registered, "Must register AM before creating allocator.")
-    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, 
getAttemptId(), securityMgr,
+    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, 
appAttemptId, securityMgr,
       localResources, new SparkRackResolver())
   }
 
@@ -100,11 +101,6 @@ private[spark] class YarnRMClient extends Logging {
     }
   }
 
-  /** Returns the attempt ID. */
-  def getAttemptId(): ApplicationAttemptId = {
-    YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
-  }
-
   /** Returns the configuration for the AmIpFilter to add to the Spark UI. */
   def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): 
Map[String, String] = {
     // Figure out which scheme Yarn is using. Note the method seems to have 
been added after 2.2,
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 7e9cd40..6091cd4 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -321,16 +321,6 @@ package object config {
     .stringConf
     .createOptional
 
-  // The list of cache-related config entries. This is used by Client and the 
AM to clean
-  // up the environment so that these settings do not appear on the web UI.
-  private[yarn] val CACHE_CONFIGS = Seq(
-    CACHED_FILES,
-    CACHED_FILES_SIZES,
-    CACHED_FILES_TIMESTAMPS,
-    CACHED_FILES_VISIBILITIES,
-    CACHED_FILES_TYPES,
-    CACHED_CONF_ARCHIVE)
-
   /* YARN allocator-level blacklisting related config entries. */
   private[spark] val YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED =
     ConfigBuilder("spark.yarn.blacklist.executor.launch.blacklisting.enabled")
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
index 2d9a3f0..bb40ea8 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -36,10 +36,11 @@ import org.apache.spark.util.Utils
  * [[ServiceCredentialProvider]] interface, as well as the builtin providers 
defined
  * in [[HadoopDelegationTokenManager]].
  */
-private[yarn] class YARNHadoopDelegationTokenManager(
+private[spark] class YARNHadoopDelegationTokenManager(
     _sparkConf: SparkConf,
-    _hadoopConf: Configuration)
-  extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf) {
+    _hadoopConf: Configuration,
+    _schedulerRef: RpcEndpointRef)
+  extends HadoopDelegationTokenManager(_sparkConf, _hadoopConf, _schedulerRef) 
{
 
   private val credentialProviders = {
     ServiceLoader.load(classOf[ServiceCredentialProvider], 
Utils.getContextOrSparkClassLoader)
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 167eef1..934fba3 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.launcher.SparkAppHandle
 import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 
 private[spark] class YarnClientSchedulerBackend(
     scheduler: TaskSchedulerImpl,
@@ -166,4 +167,9 @@ private[spark] class YarnClientSchedulerBackend(
     logInfo("Stopped")
   }
 
+  override protected def updateDelegationTokens(tokens: Array[Byte]): Unit = {
+    super.updateDelegationTokens(tokens)
+    amEndpoint.foreach(_.send(UpdateDelegationTokens(tokens)))
+  }
+
 }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 1289d4b..6357d4a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -27,6 +27,8 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
 
 import org.apache.spark.SparkContext
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
@@ -55,6 +57,7 @@ private[spark] abstract class YarnSchedulerBackend(
   protected var totalExpectedExecutors = 0
 
   private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+  protected var amEndpoint: Option[RpcEndpointRef] = None
 
   private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
     YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
@@ -191,6 +194,11 @@ private[spark] abstract class YarnSchedulerBackend(
     sc.executorAllocationManager.foreach(_.reset())
   }
 
+  override protected def createTokenManager(
+      schedulerRef: RpcEndpointRef): Option[HadoopDelegationTokenManager] = {
+    Some(new YARNHadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration, 
schedulerRef))
+  }
+
   /**
    * Override the DriverEndpoint to add extra logic for the case when an 
executor is disconnected.
    * This endpoint communicates with the executors and queries the AM for an 
executor's exit
@@ -226,7 +234,6 @@ private[spark] abstract class YarnSchedulerBackend(
    */
   private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
     extends ThreadSafeRpcEndpoint with Logging {
-    private var amEndpoint: Option[RpcEndpointRef] = None
 
     private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
         executorId: String,
@@ -266,11 +273,6 @@ private[spark] abstract class YarnSchedulerBackend(
           logWarning(s"Requesting driver to remove executor $executorId for 
reason $reason")
           driverEndpoint.send(r)
         }
-
-      case u @ UpdateDelegationTokens(tokens) =>
-        // Add the tokens to the current user and send a message to the 
scheduler so that it
-        // notifies all registered executors of the new tokens.
-        driverEndpoint.send(u)
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
@@ -304,6 +306,9 @@ private[spark] abstract class YarnSchedulerBackend(
 
       case RetrieveLastAllocatedExecutorId =>
         context.reply(currentExecutorIdCounter)
+
+      case RetrieveDelegationTokens =>
+        context.reply(currentDelegationTokens)
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
index 98315e4..f00453c 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -34,7 +34,7 @@ class YARNHadoopDelegationTokenManagerSuite extends 
SparkFunSuite {
   }
 
   test("Correctly loads credential providers") {
-    credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf)
+    credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, 
hadoopConf, null)
     assert(credentialManager.isProviderLoaded("yarn-test"))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to