This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b3fe3a  [SPARK-25815][K8S] Support kerberos in client mode, 
keytab-based token renewal.
4b3fe3a is described below

commit 4b3fe3a9ccc8a4a8eb0d037d19cb07a8a288e37a
Author: Marcelo Vanzin <van...@cloudera.com>
AuthorDate: Tue Dec 18 13:30:09 2018 -0800

    [SPARK-25815][K8S] Support kerberos in client mode, keytab-based token 
renewal.
    
    This change hooks up the k8s backed to the updated 
HadoopDelegationTokenManager,
    so that delegation tokens are also available in client mode, and 
keytab-based token
    renewal is enabled.
    
    The change re-works the k8s feature steps related to kerberos so
    that the driver does all the credential management and provides all
    the needed information to executors - so nothing needs to be added
    to executor pods. This also makes cluster mode behave a lot more
    similarly to client mode, since no driver-related config steps are run
    in the latter case.
    
    The main two things that don't need to happen in executors anymore are:
    
    - adding the Hadoop config to the executor pods: this is not needed
      since the Spark driver will serialize the Hadoop config and send
      it to executors when running tasks.
    
    - mounting the kerberos config file in the executor pods: this is
      not needed once you remove the above. The Hadoop conf sent by
      the driver with the tasks is already resolved (i.e. has all the
      kerberos names properly defined), so executors do not need access
      to the kerberos realm information anymore.
    
    The change also avoids creating delegation tokens unnecessarily.
    This means that they'll only be created if a secret with tokens
    was not provided, and if a keytab is not provided. In either of
    those cases, the driver code will handle delegation tokens: in
    cluster mode by creating a secret and stashing them, in client
    mode by using existing mechanisms to send DTs to executors.
    
    One last feature: the change also allows defining a keytab with
    a "local:" URI. This is supported in client mode (although that's
    the same as not saying "local:"), and in k8s cluster mode. This
    allows the keytab to be mounted onto the image from a pre-existing
    secret, for example.
    
    Finally, the new code always sets SPARK_USER in the driver and
    executor pods. This is in line with how other resource managers
    behave: the submitting user reflects which user will access
    Hadoop services in the app. (With kerberos, that's overridden
    by the logged in user.) That user is unrelated to the OS user
    the app is running as inside the containers.
    
    Tested:
    - client and cluster mode with kinit
    - cluster mode with keytab
    - cluster mode with local: keytab
    - YARN cluster with keytab (to make sure it isn't broken)
    
    Closes #22911 from vanzin/SPARK-25815.
    
    Authored-by: Marcelo Vanzin <van...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      |  29 +-
 .../security/HadoopDelegationTokenManager.scala    |   8 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |   8 +
 .../org/apache/spark/deploy/k8s/Constants.scala    |   9 +-
 .../apache/spark/deploy/k8s/KubernetesConf.scala   |   4 -
 .../org/apache/spark/deploy/k8s/SparkPod.scala     |  25 +-
 .../k8s/features/BasicDriverFeatureStep.scala      |   4 +
 .../k8s/features/BasicExecutorFeatureStep.scala    |   4 +
 .../k8s/features/HadoopConfDriverFeatureStep.scala | 124 ++++++++
 .../features/HadoopConfExecutorFeatureStep.scala   |  40 ---
 .../HadoopSparkUserExecutorFeatureStep.scala       |  35 ---
 .../features/KerberosConfDriverFeatureStep.scala   | 315 ++++++++++++---------
 .../features/KerberosConfExecutorFeatureStep.scala |  46 ---
 .../features/hadooputils/HadoopBootstrapUtil.scala | 283 ------------------
 .../features/hadooputils/KerberosConfigSpec.scala  |  33 ---
 .../k8s/submit/KubernetesDriverBuilder.scala       |   1 +
 .../k8s/KubernetesClusterSchedulerBackend.scala    |   7 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala    |   5 +-
 .../k8s/features/BasicDriverFeatureStepSuite.scala |   3 +-
 .../features/BasicExecutorFeatureStepSuite.scala   |   9 +-
 .../HadoopConfDriverFeatureStepSuite.scala         |  71 +++++
 .../KerberosConfDriverFeatureStepSuite.scala       | 171 +++++++++++
 .../k8s/features/KubernetesFeaturesTestUtils.scala |   6 +
 .../org/apache/spark/deploy/yarn/Client.scala      |  24 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala |   6 +-
 25 files changed, 649 insertions(+), 621 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index d4055cb..763bd0a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 import java.io._
 import java.lang.reflect.{InvocationTargetException, Modifier, 
UndeclaredThrowableException}
-import java.net.URL
+import java.net.{URI, URL}
 import java.security.PrivilegedExceptionAction
 import java.text.ParseException
 import java.util.UUID
@@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging {
     val hadoopConf = 
conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
     val targetDir = Utils.createTempDir()
 
-    // assure a keytab is available from any place in a JVM
-    if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || 
isKubernetesCluster) {
-      if (args.principal != null) {
-        if (args.keytab != null) {
-          require(new File(args.keytab).exists(), s"Keytab file: 
${args.keytab} does not exist")
-          // Add keytab and principal configurations in sysProps to make them 
available
-          // for later use; e.g. in spark sql, the isolated class loader used 
to talk
-          // to HiveMetastore will use these settings. They will be set as 
Java system
-          // properties and then loaded by SparkConf
-          sparkConf.set(KEYTAB, args.keytab)
-          sparkConf.set(PRINCIPAL, args.principal)
-          UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
-        }
+    // Kerberos is not supported in standalone mode, and keytab support is not 
yet available
+    // in Mesos cluster mode.
+    if (clusterManager != STANDALONE
+        && !isMesosCluster
+        && args.principal != null
+        && args.keytab != null) {
+      // If client mode, make sure the keytab is just a local path.
+      if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
+        args.keytab = new URI(args.keytab).getPath()
+      }
+
+      if (!Utils.isLocalUri(args.keytab)) {
+        require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} 
does not exist")
+        UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
       }
     }
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 126a6ab..f7e3dde 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.security
 
 import java.io.File
+import java.net.URI
 import java.security.PrivilegedExceptionAction
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
@@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager(
   private val providerEnabledConfig = "spark.security.credentials.%s.enabled"
 
   private val principal = sparkConf.get(PRINCIPAL).orNull
-  private val keytab = sparkConf.get(KEYTAB).orNull
+
+  // The keytab can be a local: URI for cluster mode, so translate it to a 
regular path. If it is
+  // needed later on, the code will check that it exists.
+  private val keytab = sparkConf.get(KEYTAB).map { uri => new 
URI(uri).getPath() }.orNull
 
   require((principal == null) == (keytab == null),
     "Both principal and keytab must be defined, or neither.")
-  require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at 
$keytab.")
 
   private val delegationTokenProviders = loadProviders()
   logDebug("Using the following builtin delegation token providers: " +
@@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager(
 
   private def doLogin(): UserGroupInformation = {
     logInfo(s"Attempting to login to KDC using principal: $principal")
+    require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
     val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
keytab)
     logInfo("Successfully logged into KDC.")
     ugi
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 143abd3..f322e92 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -92,6 +92,9 @@ private[spark] object Utils extends Logging {
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
+  /** Scheme used for files that are locally available on worker nodes in the 
cluster. */
+  val LOCAL_SCHEME = "local"
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -2829,6 +2832,11 @@ private[spark] object Utils extends Logging {
   def isClientMode(conf: SparkConf): Boolean = {
     "client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client"))
   }
+
+  /** Returns whether the URI is a "local:" URI. */
+  def isLocalUri(uri: String): Boolean = {
+    uri.startsWith(s"$LOCAL_SCHEME:")
+  }
 }
 
 private[util] object CallerContext extends Logging {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 85917b8..76041e7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -87,25 +87,22 @@ private[spark] object Constants {
   val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
 
   // Hadoop Configuration
-  val HADOOP_FILE_VOLUME = "hadoop-properties"
+  val HADOOP_CONF_VOLUME = "hadoop-properties"
   val KRB_FILE_VOLUME = "krb5-file"
   val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
   val KRB_FILE_DIR_PATH = "/etc"
   val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
   val HADOOP_CONFIG_MAP_NAME =
     "spark.kubernetes.executor.hadoopConfigMapName"
-  val KRB5_CONFIG_MAP_NAME =
-    "spark.kubernetes.executor.krb5ConfigMapName"
 
   // Kerberos Configuration
-  val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
   val KERBEROS_DT_SECRET_NAME =
     "spark.kubernetes.kerberos.dt-secret-name"
   val KERBEROS_DT_SECRET_KEY =
     "spark.kubernetes.kerberos.dt-secret-key"
-  val KERBEROS_SPARK_USER_NAME =
-    "spark.kubernetes.kerberos.spark-user-name"
   val KERBEROS_SECRET_KEY = "hadoop-tokens"
+  val KERBEROS_KEYTAB_VOLUME = "kerberos-keytab"
+  val KERBEROS_KEYTAB_MOUNT_POINT = "/mnt/secrets/kerberos-keytab"
 
   // Hadoop credentials secrets for the Spark app.
   val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index a06c21b..6febad9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -42,10 +42,6 @@ private[spark] abstract class KubernetesConf(val sparkConf: 
SparkConf) {
 
   def appName: String = get("spark.app.name", "spark")
 
-  def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config"
-
-  def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file"
-
   def namespace: String = get(KUBERNETES_NAMESPACE)
 
   def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
index 345dd11..fd11963 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala
@@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s
 
 import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, 
PodBuilder}
 
-private[spark] case class SparkPod(pod: Pod, container: Container)
+private[spark] case class SparkPod(pod: Pod, container: Container) {
+
+  /**
+   * Convenience method to apply a series of chained transformations to a pod.
+   *
+   * Use it like:
+   *
+   *     original.modify { case pod =>
+   *       // update pod and return new one
+   *     }.modify { case pod =>
+   *       // more changes that create a new pod
+   *     }.modify {
+   *       case pod if someCondition => // new pod
+   *     }
+   *
+   * This makes it cleaner to apply multiple transformations, avoiding having 
to create
+   * a bunch of awkwardly-named local variables. Since the argument is a 
partial function,
+   * it can do matching without needing to exhaust all the possibilities. If 
the function
+   * is not applied, then the original pod will be kept.
+   */
+  def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = 
fn.lift(this).getOrElse(this)
+
+}
+
 
 private[spark] object SparkPod {
   def initialPod(): SparkPod = {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index d8cf365..8362c14 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -110,6 +110,10 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
         .withContainerPort(driverUIPort)
         .withProtocol("TCP")
         .endPort()
+      .addNewEnv()
+        .withName(ENV_SPARK_USER)
+        .withValue(Utils.getCurrentUserName())
+        .endEnv()
       .addAllToEnv(driverCustomEnvs.asJava)
       .addNewEnv()
         .withName(ENV_DRIVER_BIND_ADDRESS)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 4bcf4c9..c8bf7cd 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -163,6 +163,10 @@ private[spark] class BasicExecutorFeatureStep(
         .addToLimits("memory", executorMemoryQuantity)
         .addToRequests("cpu", executorCpuQuantity)
         .endResources()
+        .addNewEnv()
+          .withName(ENV_SPARK_USER)
+          .withValue(Utils.getCurrentUserName())
+          .endEnv()
       .addAllToEnv(executorEnv.asJava)
       .withPorts(requiredPorts.asJava)
       .addToArgs("executor")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
new file mode 100644
index 0000000..d602ed5
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStep.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+/**
+ * Mounts the Hadoop configuration - either a pre-defined config map, or a 
local configuration
+ * directory - on the driver pod.
+ */
+private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
+  extends KubernetesFeatureConfigStep {
+
+  private val confDir = Option(conf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
+  private val existingConfMap = conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
+
+  KubernetesUtils.requireNandDefined(
+    confDir,
+    existingConfMap,
+    "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " 
+
+    "as the creation of an additional ConfigMap, when one is already specified 
is extraneous")
+
+  private lazy val confFiles: Seq[File] = {
+    val dir = new File(confDir.get)
+    if (dir.isDirectory) {
+      dir.listFiles.filter(_.isFile).toSeq
+    } else {
+      Nil
+    }
+  }
+
+  private def newConfigMapName: String = 
s"${conf.resourceNamePrefix}-hadoop-config"
+
+  private def hasHadoopConf: Boolean = confDir.isDefined || 
existingConfMap.isDefined
+
+  override def configurePod(original: SparkPod): SparkPod = {
+    original.transform { case pod if hasHadoopConf =>
+      val confVolume = if (confDir.isDefined) {
+        val keyPaths = confFiles.map { file =>
+          new KeyToPathBuilder()
+            .withKey(file.getName())
+            .withPath(file.getName())
+            .build()
+        }
+        new VolumeBuilder()
+          .withName(HADOOP_CONF_VOLUME)
+          .withNewConfigMap()
+            .withName(newConfigMapName)
+            .withItems(keyPaths.asJava)
+            .endConfigMap()
+          .build()
+      } else {
+        new VolumeBuilder()
+          .withName(HADOOP_CONF_VOLUME)
+          .withNewConfigMap()
+            .withName(existingConfMap.get)
+            .endConfigMap()
+          .build()
+      }
+
+      val podWithConf = new PodBuilder(pod.pod)
+        .editSpec()
+          .addNewVolumeLike(confVolume)
+            .endVolume()
+          .endSpec()
+          .build()
+
+      val containerWithMount = new ContainerBuilder(pod.container)
+        .addNewVolumeMount()
+          .withName(HADOOP_CONF_VOLUME)
+          .withMountPath(HADOOP_CONF_DIR_PATH)
+          .endVolumeMount()
+        .addNewEnv()
+          .withName(ENV_HADOOP_CONF_DIR)
+          .withValue(HADOOP_CONF_DIR_PATH)
+          .endEnv()
+        .build()
+
+      SparkPod(podWithConf, containerWithMount)
+    }
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    if (confDir.isDefined) {
+      val fileMap = confFiles.map { file =>
+        (file.getName(), Files.toString(file, StandardCharsets.UTF_8))
+      }.toMap.asJava
+
+      Seq(new ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(newConfigMapName)
+          .endMetadata()
+        .addToData(fileMap)
+        .build())
+    } else {
+      Nil
+    }
+  }
+
+}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
deleted file mode 100644
index da33288..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features
-
-import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
-import org.apache.spark.internal.Logging
-
-/**
- * This step is responsible for bootstraping the container with ConfigMaps
- * containing Hadoop config files mounted as volumes and an ENV variable
- * pointed to the mounted file directory.
- */
-private[spark] class HadoopConfExecutorFeatureStep(conf: 
KubernetesExecutorConf)
-  extends KubernetesFeatureConfigStep with Logging {
-
-  override def configurePod(pod: SparkPod): SparkPod = {
-    val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
-    if (hadoopConfDirCMapName.isDefined) {
-      HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, 
hadoopConfDirCMapName, pod)
-    } else {
-      pod
-    }
-  }
-}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
deleted file mode 100644
index c038e75..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features
-
-import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
-
-/**
- * This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are 
detected
- * however, this step would not be run if Kerberos is enabled, as Kerberos 
sets SPARK_USER
- */
-private[spark] class HadoopSparkUserExecutorFeatureStep(conf: 
KubernetesExecutorConf)
-  extends KubernetesFeatureConfigStep {
-
-  override def configurePod(pod: SparkPod): SparkPod = {
-    conf.getOption(KERBEROS_SPARK_USER_NAME).map { user =>
-      HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod)
-    }.getOrElse(pod)
-  }
-}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
index c6d5a86..721d7e9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
@@ -16,31 +16,40 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.{HasMetadata, Secret, SecretBuilder}
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
 import org.apache.commons.codec.binary.Base64
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
 
 /**
- * Runs the necessary Hadoop-based logic based on Kerberos configs and the 
presence of the
- * HADOOP_CONF_DIR. This runs various bootstrap methods defined in 
HadoopBootstrapUtil.
+ * Provide kerberos / service credentials to the Spark driver.
+ *
+ * There are three use cases, in order of precedence:
+ *
+ * - keytab: if a kerberos keytab is defined, it is provided to the driver, 
and the driver will
+ *   manage the kerberos login and the creation of delegation tokens.
+ * - existing tokens: if a secret containing delegation tokens is provided, it 
will be mounted
+ *   on the driver pod, and the driver will handle distribution of those 
tokens to executors.
+ * - tgt only: if Hadoop security is enabled, the local TGT will be used to 
create delegation
+ *   tokens which will be provided to the driver. The driver will handle 
distribution of the
+ *   tokens to executors.
  */
 private[spark] class KerberosConfDriverFeatureStep(kubernetesConf: 
KubernetesDriverConf)
-  extends KubernetesFeatureConfigStep {
-
-  private val hadoopConfDir = 
Option(kubernetesConf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
-  private val hadoopConfigMapName = 
kubernetesConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
-  KubernetesUtils.requireNandDefined(
-    hadoopConfDir,
-    hadoopConfigMapName,
-    "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " 
+
-    "as the creation of an additional ConfigMap, when one is already specified 
is extraneous")
+  extends KubernetesFeatureConfigStep with Logging {
 
   private val principal = 
kubernetesConf.get(org.apache.spark.internal.config.PRINCIPAL)
   private val keytab = 
kubernetesConf.get(org.apache.spark.internal.config.KEYTAB)
@@ -49,15 +58,6 @@ private[spark] class 
KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
   private val krb5File = kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_FILE)
   private val krb5CMap = 
kubernetesConf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP)
   private val hadoopConf = 
SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf)
-  private val tokenManager = new 
HadoopDelegationTokenManager(kubernetesConf.sparkConf, hadoopConf)
-  private val isKerberosEnabled =
-    (hadoopConfDir.isDefined && UserGroupInformation.isSecurityEnabled) ||
-      (hadoopConfigMapName.isDefined && (krb5File.isDefined || 
krb5CMap.isDefined))
-  require(keytab.isEmpty || isKerberosEnabled,
-    "You must enable Kerberos support if you are specifying a Kerberos Keytab")
-
-  require(existingSecretName.isEmpty || isKerberosEnabled,
-    "You must enable Kerberos support if you are specifying a Kerberos Secret")
 
   KubernetesUtils.requireNandDefined(
     krb5File,
@@ -79,128 +79,183 @@ private[spark] class 
KerberosConfDriverFeatureStep(kubernetesConf: KubernetesDri
     "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
       " specify the item-key where the data is stored")
 
-  private val hadoopConfigurationFiles = hadoopConfDir.map { hConfDir =>
-    HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  if (!hasKerberosConf) {
+    logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+      "Make sure that you have the krb5.conf locally on the driver image.")
   }
-  private val newHadoopConfigMapName =
-    if (hadoopConfigMapName.isEmpty) {
-      Some(kubernetesConf.hadoopConfigMapName)
-    } else {
-      None
-    }
 
-  // Either use pre-existing secret or login to create new Secret with DT 
stored within
-  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
-    secretName <- existingSecretName
-    secretItemKey <- existingSecretItemKey
-  } yield {
-    KerberosConfigSpec(
-      dtSecret = None,
-      dtSecretName = secretName,
-      dtSecretItemKey = secretItemKey,
-      jobUserName = UserGroupInformation.getCurrentUser.getShortUserName)
-  }).orElse(
-    if (isKerberosEnabled) {
-      Some(buildKerberosSpec())
+  // Create delegation tokens if needed. This is a lazy val so that it's not 
populated
+  // unnecessarily. But it needs to be accessible to different methods in this 
class,
+  // since it's not clear based solely on available configuration options that 
delegation
+  // tokens are needed when other credentials are not available.
+  private lazy val delegationTokens: Array[Byte] = {
+    if (keytab.isEmpty && existingSecretName.isEmpty) {
+      val tokenManager = new 
HadoopDelegationTokenManager(kubernetesConf.sparkConf,
+        SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf))
+      val creds = UserGroupInformation.getCurrentUser().getCredentials()
+      tokenManager.obtainDelegationTokens(creds)
+      // If no tokens and no secrets are stored in the credentials, make sure 
nothing is returned,
+      // to avoid creating an unnecessary secret.
+      if (creds.numberOfTokens() > 0 || creds.numberOfSecretKeys() > 0) {
+        SparkHadoopUtil.get.serialize(creds)
+      } else {
+        null
+      }
     } else {
-      None
+      null
     }
-  )
+  }
 
-  override def configurePod(pod: SparkPod): SparkPod = {
-    if (!isKerberosEnabled) {
-      return pod
-    }
+  private def needKeytabUpload: Boolean = keytab.exists(!Utils.isLocalUri(_))
 
-    val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir(
-      hadoopConfDir,
-      newHadoopConfigMapName,
-      hadoopConfigMapName,
-      pod)
-    kerberosConfSpec.map { hSpec =>
-      HadoopBootstrapUtil.bootstrapKerberosPod(
-        hSpec.dtSecretName,
-        hSpec.dtSecretItemKey,
-        hSpec.jobUserName,
-        krb5File,
-        Some(kubernetesConf.krbConfigMapName),
-        krb5CMap,
-        hadoopBasedSparkPod)
-    }.getOrElse(
-      HadoopBootstrapUtil.bootstrapSparkUserPod(
-        UserGroupInformation.getCurrentUser.getShortUserName,
-        hadoopBasedSparkPod))
-  }
+  private def dtSecretName: String = 
s"${kubernetesConf.resourceNamePrefix}-delegation-tokens"
 
-  override def getAdditionalPodSystemProperties(): Map[String, String] = {
-    if (!isKerberosEnabled) {
-      return Map.empty
-    }
+  private def ktSecretName: String = 
s"${kubernetesConf.resourceNamePrefix}-kerberos-keytab"
 
-    val resolvedConfValues = kerberosConfSpec.map { hSpec =>
-      Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName,
-        KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey,
-        KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName,
-        KRB5_CONFIG_MAP_NAME -> 
krb5CMap.getOrElse(kubernetesConf.krbConfigMapName))
-      }.getOrElse(
-        Map(KERBEROS_SPARK_USER_NAME ->
-          UserGroupInformation.getCurrentUser.getShortUserName))
-    Map(HADOOP_CONFIG_MAP_NAME ->
-      hadoopConfigMapName.getOrElse(kubernetesConf.hadoopConfigMapName)) ++ 
resolvedConfValues
-  }
+  private def hasKerberosConf: Boolean = krb5CMap.isDefined | 
krb5File.isDefined
 
-  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
-    if (!isKerberosEnabled) {
-      return Seq.empty
-    }
+  private def newConfigMapName: String = 
s"${kubernetesConf.resourceNamePrefix}-krb5-file"
 
-    val hadoopConfConfigMap = for {
-      hName <- newHadoopConfigMapName
-      hFiles <- hadoopConfigurationFiles
-    } yield {
-      HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles)
-    }
+  override def configurePod(original: SparkPod): SparkPod = {
+    original.transform { case pod if hasKerberosConf =>
+      val configMapVolume = if (krb5CMap.isDefined) {
+        new VolumeBuilder()
+          .withName(KRB_FILE_VOLUME)
+          .withNewConfigMap()
+            .withName(krb5CMap.get)
+            .endConfigMap()
+          .build()
+      } else {
+        val krb5Conf = new File(krb5File.get)
+        new VolumeBuilder()
+          .withName(KRB_FILE_VOLUME)
+          .withNewConfigMap()
+          .withName(newConfigMapName)
+          .withItems(new KeyToPathBuilder()
+            .withKey(krb5Conf.getName())
+            .withPath(krb5Conf.getName())
+            .build())
+          .endConfigMap()
+          .build()
+      }
 
-    val krb5ConfigMap = krb5File.map { fileLocation =>
-      HadoopBootstrapUtil.buildkrb5ConfigMap(
-        kubernetesConf.krbConfigMapName,
-        fileLocation)
-    }
+      val podWithVolume = new PodBuilder(pod.pod)
+        .editSpec()
+          .addNewVolumeLike(configMapVolume)
+            .endVolume()
+          .endSpec()
+        .build()
+
+      val containerWithMount = new ContainerBuilder(pod.container)
+        .addNewVolumeMount()
+          .withName(KRB_FILE_VOLUME)
+          .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+          .withSubPath("krb5.conf")
+          .endVolumeMount()
+        .build()
+
+      SparkPod(podWithVolume, containerWithMount)
+    }.transform {
+      case pod if needKeytabUpload =>
+        // If keytab is defined and is a submission-local file (not local: 
URI), then create a
+        // secret for it. The keytab data will be stored in this secret below.
+        val podWitKeytab = new PodBuilder(pod.pod)
+          .editOrNewSpec()
+            .addNewVolume()
+              .withName(KERBEROS_KEYTAB_VOLUME)
+              .withNewSecret()
+                .withSecretName(ktSecretName)
+                .endSecret()
+              .endVolume()
+            .endSpec()
+          .build()
+
+        val containerWithKeytab = new ContainerBuilder(pod.container)
+          .addNewVolumeMount()
+            .withName(KERBEROS_KEYTAB_VOLUME)
+            .withMountPath(KERBEROS_KEYTAB_MOUNT_POINT)
+            .endVolumeMount()
+          .build()
+
+        SparkPod(podWitKeytab, containerWithKeytab)
+
+      case pod if existingSecretName.isDefined | delegationTokens != null =>
+        val secretName = existingSecretName.getOrElse(dtSecretName)
+        val itemKey = existingSecretItemKey.getOrElse(KERBEROS_SECRET_KEY)
+
+        val podWithTokens = new PodBuilder(pod.pod)
+          .editOrNewSpec()
+            .addNewVolume()
+              .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+              .withNewSecret()
+                .withSecretName(secretName)
+                .endSecret()
+              .endVolume()
+            .endSpec()
+          .build()
 
-    val kerberosDTSecret = kerberosConfSpec.flatMap(_.dtSecret)
+        val containerWithTokens = new ContainerBuilder(pod.container)
+          .addNewVolumeMount()
+            .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+            .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
+            .endVolumeMount()
+          .addNewEnv()
+            .withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
+            .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$itemKey")
+            .endEnv()
+          .build()
 
-    hadoopConfConfigMap.toSeq ++
-      krb5ConfigMap.toSeq ++
-      kerberosDTSecret.toSeq
+        SparkPod(podWithTokens, containerWithTokens)
+    }
   }
 
-  private def buildKerberosSpec(): KerberosConfigSpec = {
-    // The JobUserUGI will be taken fom the Local Ticket Cache or via 
keytab+principal
-    // The login happens in the SparkSubmit so login logic is not necessary to 
include
-    val jobUserUGI = UserGroupInformation.getCurrentUser
-    val creds = jobUserUGI.getCredentials
-    tokenManager.obtainDelegationTokens(creds)
-    val tokenData = SparkHadoopUtil.get.serialize(creds)
-    require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
-    val newSecretName =
-      
s"${kubernetesConf.resourceNamePrefix}-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
-    val secretDT =
-      new SecretBuilder()
-        .withNewMetadata()
-          .withName(newSecretName)
-          .endMetadata()
-        .addToData(KERBEROS_SECRET_KEY, Base64.encodeBase64String(tokenData))
-        .build()
-    KerberosConfigSpec(
-      dtSecret = Some(secretDT),
-      dtSecretName = newSecretName,
-      dtSecretItemKey = KERBEROS_SECRET_KEY,
-      jobUserName = jobUserUGI.getShortUserName)
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    // If a submission-local keytab is provided, update the Spark config so 
that it knows the
+    // path of the keytab in the driver container.
+    if (needKeytabUpload) {
+      val ktName = new File(keytab.get).getName()
+      Map(KEYTAB.key -> s"$KERBEROS_KEYTAB_MOUNT_POINT/$ktName")
+    } else {
+      Map.empty
+    }
   }
 
-  private case class KerberosConfigSpec(
-      dtSecret: Option[Secret],
-      dtSecretName: String,
-      dtSecretItemKey: String,
-      jobUserName: String)
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    Seq[HasMetadata]() ++ {
+      krb5File.map { path =>
+        val file = new File(path)
+        new ConfigMapBuilder()
+          .withNewMetadata()
+            .withName(newConfigMapName)
+            .endMetadata()
+          .addToData(
+            Map(file.getName() -> Files.toString(file, 
StandardCharsets.UTF_8)).asJava)
+          .build()
+      }
+    } ++ {
+      // If a submission-local keytab is provided, stash it in a secret.
+      if (needKeytabUpload) {
+        val kt = new File(keytab.get)
+        Seq(new SecretBuilder()
+          .withNewMetadata()
+            .withName(ktSecretName)
+            .endMetadata()
+          .addToData(kt.getName(), 
Base64.encodeBase64String(Files.toByteArray(kt)))
+          .build())
+      } else {
+        Nil
+      }
+    } ++ {
+      if (delegationTokens != null) {
+        Seq(new SecretBuilder()
+          .withNewMetadata()
+            .withName(dtSecretName)
+            .endMetadata()
+          .addToData(KERBEROS_SECRET_KEY, 
Base64.encodeBase64String(delegationTokens))
+          .build())
+      } else {
+        Nil
+      }
+    }
+  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
deleted file mode 100644
index 907271b..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features
-
-import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
-import org.apache.spark.internal.Logging
-
-/**
- * This step is responsible for mounting the DT secret for the executors
- */
-private[spark] class KerberosConfExecutorFeatureStep(conf: 
KubernetesExecutorConf)
-  extends KubernetesFeatureConfigStep with Logging {
-
-  override def configurePod(pod: SparkPod): SparkPod = {
-    val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
-    if (maybeKrb5CMap.isDefined) {
-      logInfo(s"Mounting Resources for Kerberos")
-      HadoopBootstrapUtil.bootstrapKerberosPod(
-        conf.get(KERBEROS_DT_SECRET_NAME),
-        conf.get(KERBEROS_DT_SECRET_KEY),
-        conf.get(KERBEROS_SPARK_USER_NAME),
-        None,
-        None,
-        maybeKrb5CMap,
-        pod)
-    } else {
-      pod
-    }
-  }
-}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
deleted file mode 100644
index 5bee766..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.hadooputils
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-
-import scala.collection.JavaConverters._
-
-import com.google.common.io.Files
-import io.fabric8.kubernetes.api.model._
-
-import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.SparkPod
-import org.apache.spark.internal.Logging
-
-private[spark] object HadoopBootstrapUtil extends Logging {
-
-  /**
-   * Mounting the DT secret for both the Driver and the executors
-   *
-   * @param dtSecretName Name of the secret that stores the Delegation Token
-   * @param dtSecretItemKey Name of the Item Key storing the Delegation Token
-   * @param userName Name of the SparkUser to set SPARK_USER
-   * @param fileLocation Optional Location of the krb5 file
-   * @param newKrb5ConfName Optional location of the ConfigMap for Krb5
-   * @param existingKrb5ConfName Optional name of ConfigMap for Krb5
-   * @param pod Input pod to be appended to
-   * @return a modified SparkPod
-   */
-  def bootstrapKerberosPod(
-      dtSecretName: String,
-      dtSecretItemKey: String,
-      userName: String,
-      fileLocation: Option[String],
-      newKrb5ConfName: Option[String],
-      existingKrb5ConfName: Option[String],
-      pod: SparkPod): SparkPod = {
-
-    val preConfigMapVolume = existingKrb5ConfName.map { kconf =>
-      new VolumeBuilder()
-        .withName(KRB_FILE_VOLUME)
-        .withNewConfigMap()
-          .withName(kconf)
-          .endConfigMap()
-        .build()
-    }
-
-    val createConfigMapVolume = for {
-      fLocation <- fileLocation
-      krb5ConfName <- newKrb5ConfName
-    } yield {
-      val krb5File = new File(fLocation)
-      val fileStringPath = krb5File.toPath.getFileName.toString
-      new VolumeBuilder()
-        .withName(KRB_FILE_VOLUME)
-        .withNewConfigMap()
-        .withName(krb5ConfName)
-        .withItems(new KeyToPathBuilder()
-          .withKey(fileStringPath)
-          .withPath(fileStringPath)
-          .build())
-        .endConfigMap()
-        .build()
-    }
-
-    // Breaking up Volume creation for clarity
-    val configMapVolume = preConfigMapVolume.orElse(createConfigMapVolume)
-    if (configMapVolume.isEmpty) {
-       logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
-         "Make sure that you have the krb5.conf locally on the Driver and 
Executor images")
-    }
-
-    val kerberizedPodWithDTSecret = new PodBuilder(pod.pod)
-      .editOrNewSpec()
-        .addNewVolume()
-          .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
-          .withNewSecret()
-            .withSecretName(dtSecretName)
-            .endSecret()
-          .endVolume()
-        .endSpec()
-      .build()
-
-    // Optionally add the krb5.conf ConfigMap
-    val kerberizedPod = configMapVolume.map { cmVolume =>
-      new PodBuilder(kerberizedPodWithDTSecret)
-        .editSpec()
-          .addNewVolumeLike(cmVolume)
-            .endVolume()
-          .endSpec()
-        .build()
-    }.getOrElse(kerberizedPodWithDTSecret)
-
-    val kerberizedContainerWithMounts = new ContainerBuilder(pod.container)
-      .addNewVolumeMount()
-        .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
-        .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
-        .endVolumeMount()
-      .addNewEnv()
-        .withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
-        .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$dtSecretItemKey")
-        .endEnv()
-      .addNewEnv()
-        .withName(ENV_SPARK_USER)
-        .withValue(userName)
-        .endEnv()
-      .build()
-
-    // Optionally add the krb5.conf Volume Mount
-    val kerberizedContainer =
-      if (configMapVolume.isDefined) {
-        new ContainerBuilder(kerberizedContainerWithMounts)
-          .addNewVolumeMount()
-            .withName(KRB_FILE_VOLUME)
-            .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
-            .withSubPath("krb5.conf")
-            .endVolumeMount()
-          .build()
-      } else {
-        kerberizedContainerWithMounts
-      }
-
-    SparkPod(kerberizedPod, kerberizedContainer)
-  }
-
-  /**
-   * setting ENV_SPARK_USER when HADOOP_FILES are detected
-   *
-   * @param sparkUserName Name of the SPARK_USER
-   * @param pod Input pod to be appended to
-   * @return a modified SparkPod
-   */
-  def bootstrapSparkUserPod(sparkUserName: String, pod: SparkPod): SparkPod = {
-    val envModifiedContainer = new ContainerBuilder(pod.container)
-      .addNewEnv()
-        .withName(ENV_SPARK_USER)
-        .withValue(sparkUserName)
-        .endEnv()
-      .build()
-    SparkPod(pod.pod, envModifiedContainer)
-  }
-
-  /**
-   * Grabbing files in the HADOOP_CONF_DIR
-   *
-   * @param path location of HADOOP_CONF_DIR
-   * @return a list of File object
-   */
-  def getHadoopConfFiles(path: String): Seq[File] = {
-    val dir = new File(path)
-    if (dir.isDirectory) {
-      dir.listFiles.filter(_.isFile).toSeq
-    } else {
-      Seq.empty[File]
-    }
-  }
-
-  /**
-   * Bootstraping the container with ConfigMaps that store
-   * Hadoop configuration files
-   *
-   * @param hadoopConfDir directory location of HADOOP_CONF_DIR env
-   * @param newHadoopConfigMapName name of the new configMap for 
HADOOP_CONF_DIR
-   * @param existingHadoopConfigMapName name of the pre-defined configMap for 
HADOOP_CONF_DIR
-   * @param pod Input pod to be appended to
-   * @return a modified SparkPod
-   */
-  def bootstrapHadoopConfDir(
-      hadoopConfDir: Option[String],
-      newHadoopConfigMapName: Option[String],
-      existingHadoopConfigMapName: Option[String],
-      pod: SparkPod): SparkPod = {
-    val preConfigMapVolume = existingHadoopConfigMapName.map { hConf =>
-      new VolumeBuilder()
-        .withName(HADOOP_FILE_VOLUME)
-        .withNewConfigMap()
-          .withName(hConf)
-          .endConfigMap()
-        .build() }
-
-    val createConfigMapVolume = for {
-      dirLocation <- hadoopConfDir
-      hConfName <- newHadoopConfigMapName
-    } yield {
-      val hadoopConfigFiles = getHadoopConfFiles(dirLocation)
-      val keyPaths = hadoopConfigFiles.map { file =>
-        val fileStringPath = file.toPath.getFileName.toString
-        new KeyToPathBuilder()
-          .withKey(fileStringPath)
-          .withPath(fileStringPath)
-          .build()
-      }
-      new VolumeBuilder()
-        .withName(HADOOP_FILE_VOLUME)
-        .withNewConfigMap()
-          .withName(hConfName)
-          .withItems(keyPaths.asJava)
-          .endConfigMap()
-        .build()
-    }
-
-    // Breaking up Volume Creation for clarity
-    val configMapVolume = 
preConfigMapVolume.getOrElse(createConfigMapVolume.get)
-
-    val hadoopSupportedPod = new PodBuilder(pod.pod)
-      .editSpec()
-        .addNewVolumeLike(configMapVolume)
-          .endVolume()
-        .endSpec()
-        .build()
-
-    val hadoopSupportedContainer = new ContainerBuilder(pod.container)
-      .addNewVolumeMount()
-        .withName(HADOOP_FILE_VOLUME)
-        .withMountPath(HADOOP_CONF_DIR_PATH)
-        .endVolumeMount()
-      .addNewEnv()
-        .withName(ENV_HADOOP_CONF_DIR)
-        .withValue(HADOOP_CONF_DIR_PATH)
-        .endEnv()
-      .build()
-    SparkPod(hadoopSupportedPod, hadoopSupportedContainer)
-  }
-
-  /**
-   * Builds ConfigMap given the file location of the
-   * krb5.conf file
-   *
-   * @param configMapName name of configMap for krb5
-   * @param fileLocation location of krb5 file
-   * @return a ConfigMap
-   */
-  def buildkrb5ConfigMap(
-      configMapName: String,
-      fileLocation: String): ConfigMap = {
-    val file = new File(fileLocation)
-    new ConfigMapBuilder()
-      .withNewMetadata()
-        .withName(configMapName)
-        .endMetadata()
-      .addToData(Map(file.toPath.getFileName.toString ->
-        Files.toString(file, StandardCharsets.UTF_8)).asJava)
-      .build()
-  }
-
-  /**
-   * Builds ConfigMap given the ConfigMap name
-   * and a list of Hadoop Conf files
-   *
-   * @param hadoopConfigMapName name of hadoopConfigMap
-   * @param hadoopConfFiles list of hadoopFiles
-   * @return a ConfigMap
-   */
-  def buildHadoopConfigMap(
-      hadoopConfigMapName: String,
-      hadoopConfFiles: Seq[File]): ConfigMap = {
-    new ConfigMapBuilder()
-      .withNewMetadata()
-        .withName(hadoopConfigMapName)
-        .endMetadata()
-      .addToData(hadoopConfFiles.map { file =>
-        (file.toPath.getFileName.toString,
-          Files.toString(file, StandardCharsets.UTF_8))
-        }.toMap.asJava)
-      .build()
-  }
-
-}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
deleted file mode 100644
index 7f7ef21..0000000
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.k8s.features.hadooputils
-
-import io.fabric8.kubernetes.api.model.Secret
-
-/**
- * Represents a given configuration of the Kerberos Configuration logic
- * <p>
- * - The secret containing a DT, either previously specified or built on the 
fly
- * - The name of the secret where the DT will be stored
- * - The data item-key on the secret which correlates with where the current 
DT data is stored
- * - The Job User's username
- */
-private[spark] case class KerberosConfigSpec(
-    dtSecret: Option[Secret],
-    dtSecretName: String,
-    dtSecretItemKey: String,
-    jobUserName: String)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index d2c0ced..57e4060 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -46,6 +46,7 @@ private[spark] class KubernetesDriverBuilder {
       new LocalDirsFeatureStep(conf),
       new MountVolumesFeatureStep(conf),
       new DriverCommandFeatureStep(conf),
+      new HadoopConfDriverFeatureStep(conf),
       new KerberosConfDriverFeatureStep(conf),
       new PodTemplateConfigMapStep(conf))
 
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 03f5da2..cd29897 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.KubernetesClient
 import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
@@ -143,7 +144,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   override def createDriverEndpoint(properties: Seq[(String, String)]): 
DriverEndpoint = {
-    new KubernetesDriverEndpoint(rpcEnv, properties)
+    new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
+  }
+
+  override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
+    Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration))
   }
 
   private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 0b74966..48aa2c5 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -44,10 +44,7 @@ private[spark] class KubernetesExecutorBuilder {
       new MountSecretsFeatureStep(conf),
       new EnvSecretsFeatureStep(conf),
       new LocalDirsFeatureStep(conf),
-      new MountVolumesFeatureStep(conf),
-      new HadoopConfExecutorFeatureStep(conf),
-      new KerberosConfExecutorFeatureStep(conf),
-      new HadoopSparkUserExecutorFeatureStep(conf))
+      new MountVolumesFeatureStep(conf))
 
     features.foldLeft(initialPod) { case (pod, feature) => 
feature.configurePod(pod) }
   }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index e4951bc..5ceb9d6 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
 import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.Utils
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
@@ -73,7 +74,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val foundPortNames = configuredPod.container.getPorts.asScala.toSet
     assert(expectedPortNames === foundPortNames)
 
-    assert(configuredPod.container.getEnv.size === 3)
     val envs = configuredPod.container
       .getEnv
       .asScala
@@ -82,6 +82,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     DRIVER_ENVS.foreach { case (k, v) =>
       assert(envs(v) === v)
     }
+    assert(envs(ENV_SPARK_USER) === Utils.getCurrentUserName())
 
     assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala ===
       TEST_IMAGE_PULL_SECRET_OBJECTS)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 05989d9..c2efab0 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -200,7 +200,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       ENV_EXECUTOR_MEMORY -> "1g",
       ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
       ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
-      ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
+      ENV_EXECUTOR_POD_IP -> null,
+      ENV_SPARK_USER -> Utils.getCurrentUserName())
 
     val extraJavaOptsStart = 
additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
     val extraJavaOpts = Utils.sparkJavaOpts(conf, 
SparkConf.isExecutorStartupConf)
@@ -208,9 +209,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       s"$ENV_JAVA_OPT_PREFIX${ind + extraJavaOptsStart}" -> opt
     }.toMap
 
-    val mapEnvs = executorPod.container.getEnv.asScala.map {
+    val containerEnvs = executorPod.container.getEnv.asScala.map {
       x => (x.getName, x.getValue)
     }.toMap
-    assert((defaultEnvs ++ extraJavaOptsEnvs) === mapEnvs)
+
+    val expectedEnvs = defaultEnvs ++ additionalEnvVars ++ extraJavaOptsEnvs
+    assert(containerEnvs === expectedEnvs)
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..e1c01db
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/HadoopConfDriverFeatureStepSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.ConfigMap
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.util.{SparkConfWithEnv, Utils}
+
+class HadoopConfDriverFeatureStepSuite extends SparkFunSuite {
+
+  import KubernetesFeaturesTestUtils._
+  import SecretVolumeUtils._
+
+  test("mount hadoop config map if defined") {
+    val sparkConf = new SparkConf(false)
+      .set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap")
+    val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
+    val step = new HadoopConfDriverFeatureStep(conf)
+    checkPod(step.configurePod(SparkPod.initialPod()))
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("create hadoop config map if config dir is defined") {
+    val confDir = Utils.createTempDir()
+    val confFiles = Set("core-site.xml", "hdfs-site.xml")
+
+    confFiles.foreach { f =>
+      Files.write("some data", new File(confDir, f), UTF_8)
+    }
+
+    val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> 
confDir.getAbsolutePath()))
+    val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
+
+    val step = new HadoopConfDriverFeatureStep(conf)
+    checkPod(step.configurePod(SparkPod.initialPod()))
+
+    val hadoopConfMap = 
filter[ConfigMap](step.getAdditionalKubernetesResources()).head
+    assert(hadoopConfMap.getData().keySet().asScala === confFiles)
+  }
+
+  private def checkPod(pod: SparkPod): Unit = {
+    assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
+    assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, 
HADOOP_CONF_DIR_PATH))
+    assert(containerHasEnvVar(pod.container, ENV_HADOOP_CONF_DIR))
+  }
+
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..41ca3a9
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStepSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{ConfigMap, Secret}
+import org.apache.commons.codec.binary.Base64
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+class KerberosConfDriverFeatureStepSuite extends SparkFunSuite {
+
+  import KubernetesFeaturesTestUtils._
+  import SecretVolumeUtils._
+
+  private val tmpDir = Utils.createTempDir()
+
+  test("mount krb5 config map if defined") {
+    val configMap = "testConfigMap"
+    val step = createStep(
+      new SparkConf(false).set(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP, configMap))
+
+    checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), configMap)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(filter[ConfigMap](step.getAdditionalKubernetesResources()).isEmpty)
+  }
+
+  test("create krb5.conf config map if local config provided") {
+    val krbConf = File.createTempFile("krb5", ".conf", tmpDir)
+    Files.write("some data", krbConf, UTF_8)
+
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_KERBEROS_KRB5_FILE, krbConf.getAbsolutePath())
+    val step = createStep(sparkConf)
+
+    val confMap = 
filter[ConfigMap](step.getAdditionalKubernetesResources()).head
+    assert(confMap.getData().keySet().asScala === Set(krbConf.getName()))
+
+    checkPodForKrbConf(step.configurePod(SparkPod.initialPod()), 
confMap.getMetadata().getName())
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+  }
+
+  test("create keytab secret if client keytab file used") {
+    val keytab = File.createTempFile("keytab", ".bin", tmpDir)
+    Files.write("some data", keytab, UTF_8)
+
+    val sparkConf = new SparkConf(false)
+      .set(KEYTAB, keytab.getAbsolutePath())
+      .set(PRINCIPAL, "alice")
+    val step = createStep(sparkConf)
+
+    val pod = step.configurePod(SparkPod.initialPod())
+    assert(podHasVolume(pod.pod, KERBEROS_KEYTAB_VOLUME))
+    assert(containerHasVolume(pod.container, KERBEROS_KEYTAB_VOLUME, 
KERBEROS_KEYTAB_MOUNT_POINT))
+
+    assert(step.getAdditionalPodSystemProperties().keys === Set(KEYTAB.key))
+
+    val secret = filter[Secret](step.getAdditionalKubernetesResources()).head
+    assert(secret.getData().keySet().asScala === Set(keytab.getName()))
+  }
+
+  test("do nothing if container-local keytab used") {
+    val sparkConf = new SparkConf(false)
+      .set(KEYTAB, "local:/my.keytab")
+      .set(PRINCIPAL, "alice")
+    val step = createStep(sparkConf)
+
+    val initial = SparkPod.initialPod()
+    assert(step.configurePod(initial) === initial)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("mount delegation tokens if provided") {
+    val dtSecret = "tokenSecret"
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, dtSecret)
+      .set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtokens")
+    val step = createStep(sparkConf)
+
+    checkPodForTokens(step.configurePod(SparkPod.initialPod()), dtSecret)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  test("create delegation tokens if needed") {
+    // Since HadoopDelegationTokenManager does not create any tokens without 
proper configs and
+    // services, start with a test user that already has some tokens that will 
just be piped
+    // through to the driver.
+    val testUser = UserGroupInformation.createUserForTesting("k8s", Array())
+    testUser.doAs(new PrivilegedExceptionAction[Unit]() {
+      override def run(): Unit = {
+        val creds = testUser.getCredentials()
+        creds.addSecretKey(new Text("K8S_TEST_KEY"), Array[Byte](0x4, 0x2))
+        testUser.addCredentials(creds)
+
+        val tokens = SparkHadoopUtil.get.serialize(creds)
+
+        val step = createStep(new SparkConf(false))
+
+        val dtSecret = 
filter[Secret](step.getAdditionalKubernetesResources()).head
+        assert(dtSecret.getData().get(KERBEROS_SECRET_KEY) === 
Base64.encodeBase64String(tokens))
+
+        checkPodForTokens(step.configurePod(SparkPod.initialPod()),
+          dtSecret.getMetadata().getName())
+
+        assert(step.getAdditionalPodSystemProperties().isEmpty)
+      }
+    })
+  }
+
+  test("do nothing if no config and no tokens") {
+    val step = createStep(new SparkConf(false))
+    val initial = SparkPod.initialPod()
+    assert(step.configurePod(initial) === initial)
+    assert(step.getAdditionalPodSystemProperties().isEmpty)
+    assert(step.getAdditionalKubernetesResources().isEmpty)
+  }
+
+  private def checkPodForKrbConf(pod: SparkPod, confMapName: String): Unit = {
+    val podVolume = pod.pod.getSpec().getVolumes().asScala.find(_.getName() == 
KRB_FILE_VOLUME)
+    assert(podVolume.isDefined)
+    assert(containerHasVolume(pod.container, KRB_FILE_VOLUME, 
KRB_FILE_DIR_PATH + "/krb5.conf"))
+    assert(podVolume.get.getConfigMap().getName() === confMapName)
+  }
+
+  private def checkPodForTokens(pod: SparkPod, dtSecretName: String): Unit = {
+    val podVolume = pod.pod.getSpec().getVolumes().asScala
+      .find(_.getName() == SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+    assert(podVolume.isDefined)
+    assert(containerHasVolume(pod.container, 
SPARK_APP_HADOOP_SECRET_VOLUME_NAME,
+      SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR))
+    assert(containerHasEnvVar(pod.container, ENV_HADOOP_TOKEN_FILE_LOCATION))
+    assert(podVolume.get.getSecret().getSecretName() === dtSecretName)
+  }
+
+  private def createStep(conf: SparkConf): KerberosConfDriverFeatureStep = {
+    val kconf = KubernetesTestConf.createDriverConf(sparkConf = conf)
+    new KerberosConfDriverFeatureStep(kconf)
+  }
+
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
index f90380e..076b681 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.deploy.k8s.features
 
 import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
 
 import io.fabric8.kubernetes.api.model.{Container, HasMetadata, PodBuilder, 
SecretBuilder}
 import org.mockito.Matchers
@@ -63,4 +64,9 @@ object KubernetesFeaturesTestUtils {
   def containerHasEnvVar(container: Container, envVarName: String): Boolean = {
     container.getEnv.asScala.exists(envVar => envVar.getName == envVarName)
   }
+
+  def filter[T: ClassTag](list: Seq[HasMetadata]): Seq[T] = {
+    val desired = implicitly[ClassTag[T]].runtimeClass
+    list.filter(_.getClass() == desired).map(_.asInstanceOf[T]).toSeq
+  }
 }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6240f7b..184fb6a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -116,6 +116,8 @@ private[spark] class Client(
     }
   }
 
+  require(keytab == null || !Utils.isLocalUri(keytab), "Keytab should 
reference a local file.")
+
   private val launcherBackend = new LauncherBackend() {
     override protected def conf: SparkConf = sparkConf
 
@@ -472,7 +474,7 @@ private[spark] class Client(
         appMasterOnly: Boolean = false): (Boolean, String) = {
       val trimmedPath = path.trim()
       val localURI = Utils.resolveURI(trimmedPath)
-      if (localURI.getScheme != LOCAL_SCHEME) {
+      if (localURI.getScheme != Utils.LOCAL_SCHEME) {
         if (addDistributedUri(localURI)) {
           val localPath = getQualifiedLocalPath(localURI, hadoopConf)
           val linkname = targetDir.map(_ + "/").getOrElse("") +
@@ -515,7 +517,7 @@ private[spark] class Client(
     val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
     if (sparkArchive.isDefined) {
       val archive = sparkArchive.get
-      require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local 
URI.")
+      require(!Utils.isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a 
local URI.")
       distribute(Utils.resolveURI(archive).toString,
         resType = LocalResourceType.ARCHIVE,
         destName = Some(LOCALIZED_LIB_DIR))
@@ -525,7 +527,7 @@ private[spark] class Client(
           // Break the list of jars to upload, and resolve globs.
           val localJars = new ArrayBuffer[String]()
           jars.foreach { jar =>
-            if (!isLocalUri(jar)) {
+            if (!Utils.isLocalUri(jar)) {
               val path = getQualifiedLocalPath(Utils.resolveURI(jar), 
hadoopConf)
               val pathFs = FileSystem.get(path.toUri(), hadoopConf)
               pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
@@ -814,7 +816,7 @@ private[spark] class Client(
     }
     (pySparkArchives ++ pyArchives).foreach { path =>
       val uri = Utils.resolveURI(path)
-      if (uri.getScheme != LOCAL_SCHEME) {
+      if (uri.getScheme != Utils.LOCAL_SCHEME) {
         pythonPath += buildPath(Environment.PWD.$$(), new Path(uri).getName())
       } else {
         pythonPath += uri.getPath()
@@ -1183,9 +1185,6 @@ private object Client extends Logging {
   // Alias for the user jar
   val APP_JAR_NAME: String = "__app__.jar"
 
-  // URI scheme that identifies local resources
-  val LOCAL_SCHEME = "local"
-
   // Staging directory for any temporary jars or files
   val SPARK_STAGING: String = ".sparkStaging"
 
@@ -1307,7 +1306,7 @@ private object Client extends Logging {
     addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, "*"), 
env)
     if (sparkConf.get(SPARK_ARCHIVE).isEmpty) {
       sparkConf.get(SPARK_JARS).foreach { jars =>
-        jars.filter(isLocalUri).foreach { jar =>
+        jars.filter(Utils.isLocalUri).foreach { jar =>
           val uri = new URI(jar)
           addClasspathEntry(getClusterPath(sparkConf, uri.getPath()), env)
         }
@@ -1340,7 +1339,7 @@ private object Client extends Logging {
   private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
     mainJar.flatMap { path =>
       val uri = Utils.resolveURI(path)
-      if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
+      if (uri.getScheme == Utils.LOCAL_SCHEME) Some(uri) else None
     }.orElse(Some(new URI(APP_JAR_NAME)))
   }
 
@@ -1368,7 +1367,7 @@ private object Client extends Logging {
       uri: URI,
       fileName: String,
       env: HashMap[String, String]): Unit = {
-    if (uri != null && uri.getScheme == LOCAL_SCHEME) {
+    if (uri != null && uri.getScheme == Utils.LOCAL_SCHEME) {
       addClasspathEntry(getClusterPath(conf, uri.getPath), env)
     } else if (fileName != null) {
       addClasspathEntry(buildPath(Environment.PWD.$$(), fileName), env)
@@ -1489,11 +1488,6 @@ private object Client extends Logging {
     components.mkString(Path.SEPARATOR)
   }
 
-  /** Returns whether the URI is a "local:" URI. */
-  def isLocalUri(uri: String): Boolean = {
-    uri.startsWith(s"$LOCAL_SCHEME:")
-  }
-
   def createAppReport(report: ApplicationReport): YarnAppReport = {
     val diags = report.getDiagnostics()
     val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index b3286e8..a6f57fc 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -100,7 +100,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
     val cp = env("CLASSPATH").split(":|;|<CPS>")
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
       val uri = new URI(entry)
-      if (LOCAL_SCHEME.equals(uri.getScheme())) {
+      if (Utils.LOCAL_SCHEME.equals(uri.getScheme())) {
         cp should contain (uri.getPath())
       } else {
         cp should not contain (uri.getPath())
@@ -136,7 +136,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       val expected = ADDED.split(",")
         .map(p => {
           val uri = new URI(p)
-          if (LOCAL_SCHEME == uri.getScheme()) {
+          if (Utils.LOCAL_SCHEME == uri.getScheme()) {
             p
           } else {
             Option(uri.getFragment()).getOrElse(new File(p).getName())
@@ -249,7 +249,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
     classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
 
-    sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
+    sparkConf.set(SPARK_ARCHIVE, Utils.LOCAL_SCHEME + ":" + archive.getPath())
     intercept[IllegalArgumentException] {
       client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to