[SPARK-23257][K8S] Kerberos Support for Spark on K8S

## What changes were proposed in this pull request?
This is the work on setting up Secure HDFS interaction with Spark-on-K8S.
The architecture is discussed in this community-wide google 
[doc](https://docs.google.com/document/d/1RBnXD9jMDjGonOdKJ2bA1lN4AAV_1RwpU_ewFuCNWKg)
This initiative can be broken down into 4 Stages

**STAGE 1**
- [x] Detecting `HADOOP_CONF_DIR` environmental variable and using Config Maps 
to store all Hadoop config files locally, while also setting `HADOOP_CONF_DIR` 
locally in the driver / executors

**STAGE 2**
- [x] Grabbing `TGT` from `LTC` or using keytabs+principle and creating a `DT` 
that will be mounted as a secret or using a pre-populated secret

**STAGE 3**
- [x] Driver

**STAGE 4**
- [x] Executor

## How was this patch tested?
Locally tested on a single-noded, pseudo-distributed Kerberized Hadoop Cluster
- [x] E2E Integration tests https://github.com/apache/spark/pull/22608
- [ ] Unit tests

## Docs and Error Handling?
- [x] Docs
- [x] Error Handling

## Contribution Credit
kimoonkim skonto

Closes #21669 from ifilonenko/secure-hdfs.

Lead-authored-by: Ilan Filonenko <[email protected]>
Co-authored-by: Ilan Filonenko <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c9c84ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c9c84ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c9c84ff

Branch: refs/heads/master
Commit: 6c9c84ffb9c8d98ee2ece7ba4b010856591d383d
Parents: 0820484
Author: Ilan Filonenko <[email protected]>
Authored: Mon Oct 15 15:48:51 2018 -0700
Committer: Marcelo Vanzin <[email protected]>
Committed: Mon Oct 15 15:48:51 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |   9 +-
 docs/running-on-kubernetes.md                   |  41 +++
 docs/security.md                                |  75 +++++
 .../org/apache/spark/examples/HdfsTest.scala    |   2 +
 .../org/apache/spark/deploy/k8s/Config.scala    |  37 +++
 .../org/apache/spark/deploy/k8s/Constants.scala |  27 ++
 .../spark/deploy/k8s/KubernetesConf.scala       |  42 ++-
 .../spark/deploy/k8s/KubernetesUtils.scala      |  19 ++
 .../HadoopConfExecutorFeatureStep.scala         |  48 ++++
 .../HadoopSparkUserExecutorFeatureStep.scala    |  43 +++
 .../KerberosConfDriverFeatureStep.scala         | 165 +++++++++++
 .../KerberosConfExecutorFeatureStep.scala       |  53 ++++
 .../hadooputils/HadoopBootstrapUtil.scala       | 283 +++++++++++++++++++
 .../hadooputils/HadoopKerberosLogin.scala       |  66 +++++
 .../hadooputils/KerberosConfigSpec.scala        |  33 +++
 ...KubernetesHadoopDelegationTokenManager.scala |  62 ++++
 .../submit/KubernetesClientApplication.scala    |  10 +-
 .../k8s/submit/KubernetesDriverBuilder.scala    |  18 +-
 .../cluster/k8s/KubernetesExecutorBuilder.scala |  38 ++-
 .../spark/deploy/k8s/KubernetesConfSuite.scala  |  21 +-
 .../features/BasicDriverFeatureStepSuite.scala  |  17 +-
 .../BasicExecutorFeatureStepSuite.scala         |  12 +-
 ...rKubernetesCredentialsFeatureStepSuite.scala |   9 +-
 .../DriverServiceFeatureStepSuite.scala         |  18 +-
 .../features/EnvSecretsFeatureStepSuite.scala   |   3 +-
 .../features/LocalDirsFeatureStepSuite.scala    |   3 +-
 .../features/MountSecretsFeatureStepSuite.scala |   3 +-
 .../features/MountVolumesFeatureStepSuite.scala |   3 +-
 .../bindings/JavaDriverFeatureStepSuite.scala   |   3 +-
 .../bindings/PythonDriverFeatureStepSuite.scala |   6 +-
 .../bindings/RDriverFeatureStepSuite.scala      |   3 +-
 .../spark/deploy/k8s/submit/ClientSuite.scala   |   3 +-
 .../submit/KubernetesDriverBuilderSuite.scala   |  93 +++++-
 .../k8s/ExecutorPodsAllocatorSuite.scala        |   1 +
 .../k8s/KubernetesExecutorBuilderSuite.scala    |  82 +++++-
 .../src/main/dockerfiles/spark/Dockerfile       |   2 +-
 .../src/main/dockerfiles/spark/entrypoint.sh    |   4 +
 37 files changed, 1290 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 61b379f..13fa6d0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -335,7 +335,7 @@ private[spark] class SparkSubmit extends Logging {
     val targetDir = Utils.createTempDir()
 
     // assure a keytab is available from any place in a JVM
-    if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
+    if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || 
isKubernetesCluster) {
       if (args.principal != null) {
         if (args.keytab != null) {
           require(new File(args.keytab).exists(), s"Keytab file: 
${args.keytab} does not exist")
@@ -646,7 +646,8 @@ private[spark] class SparkSubmit extends Logging {
       }
     }
 
-    if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
+    if ((clusterManager == MESOS || clusterManager == KUBERNETES)
+       && UserGroupInformation.isSecurityEnabled) {
       setRMPrincipal(sparkConf)
     }
 
@@ -762,8 +763,8 @@ private[spark] class SparkSubmit extends Logging {
   }
 
   // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches 
delegation tokens with
-  // renewer set to the YARN ResourceManager.  Since YARN isn't configured in 
Mesos mode, we
-  // must trick it into thinking we're YARN.
+  // renewer set to the YARN ResourceManager.  Since YARN isn't configured in 
Mesos or Kubernetes
+  // mode, we must trick it into thinking we're YARN.
   private def setRMPrincipal(sparkConf: SparkConf): Unit = {
     val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
     val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index b4088d7..d629ed3 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -821,4 +821,45 @@ specific to Spark on Kubernetes.
    This sets the major Python version of the docker image used to run the 
driver and executor containers. Can either be 2 or 3. 
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.kerberos.krb5.path</code></td>
+  <td><code>(none)</code></td>
+  <td>
+   Specify the local location of the krb5.conf file to be mounted on the 
driver and executors for Kerberos interaction.
+   It is important to note that the KDC defined needs to be visible from 
inside the containers.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.kerberos.krb5.configMapName</code></td>
+  <td><code>(none)</code></td>
+  <td>
+   Specify the name of the ConfigMap, containing the krb5.conf file, to be 
mounted on the driver and executors
+   for Kerberos interaction. The KDC defined needs to be visible from inside 
the containers. The ConfigMap must also
+   be in the same namespace of the driver and executor pods.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.hadoop.configMapName</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, 
to be mounted on the driver 
+    and executors for custom Hadoop configuration.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.kerberos.tokenSecret.name</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Specify the name of the secret where your existing delegation tokens are 
stored. This removes the need for the job user
+    to provide any kerberos credentials for launching a job. 
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.kerberos.tokenSecret.itemKey</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Specify the item key of the data where your existing delegation tokens are 
stored. This removes the need for the job user 
+    to provide any kerberos credentials for launching a job.
+  </td>
+</tr>
 </table>

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index 7fb3e17..ffae683 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -722,7 +722,82 @@ with encryption, at least.
 The Kerberos login will be periodically renewed using the provided 
credentials, and new delegation
 tokens for supported will be created.
 
+## Secure Interaction with Kubernetes
+
+When talking to Hadoop-based services behind Kerberos, it was noted that Spark 
needs to obtain delegation tokens
+so that non-local processes can authenticate. These delegation tokens in 
Kubernetes are stored in Secrets that are 
+shared by the Driver and its Executors. As such, there are three ways of 
submitting a Kerberos job: 
+
+In all cases you must define the environment variable: `HADOOP_CONF_DIR` or 
+`spark.kubernetes.hadoop.configMapName.`
+
+It also important to note that the KDC needs to be visible from inside the 
containers.
+
+If a user wishes to use a remote HADOOP_CONF directory, that contains the 
Hadoop configuration files, this could be
+achieved by setting `spark.kubernetes.hadoop.configMapName` to a pre-existing 
ConfigMap.
+
+1. Submitting with a $kinit that stores a TGT in the Local Ticket Cache:
+```bash
+/usr/bin/kinit -kt <keytab_file> <username>/<krb5 realm>
+/opt/spark/bin/spark-submit \
+    --deploy-mode cluster \
+    --class org.apache.spark.examples.HdfsTest \
+    --master k8s://<KUBERNETES_MASTER_ENDPOINT> \
+    --conf spark.executor.instances=1 \
+    --conf spark.app.name=spark-hdfs \
+    --conf spark.kubernetes.container.image=spark:latest \
+    --conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \
+    local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
+    <HDFS_FILE_LOCATION>
+```
+2. Submitting with a local Keytab and Principal
+```bash
+/opt/spark/bin/spark-submit \
+    --deploy-mode cluster \
+    --class org.apache.spark.examples.HdfsTest \
+    --master k8s://<KUBERNETES_MASTER_ENDPOINT> \
+    --conf spark.executor.instances=1 \
+    --conf spark.app.name=spark-hdfs \
+    --conf spark.kubernetes.container.image=spark:latest \
+    --conf spark.kerberos.keytab=<KEYTAB_FILE> \
+    --conf spark.kerberos.principal=<PRINCIPAL> \
+    --conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \
+    local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
+    <HDFS_FILE_LOCATION>
+```
 
+3. Submitting with pre-populated secrets, that contain the Delegation Token, 
already existing within the namespace
+```bash
+/opt/spark/bin/spark-submit \
+    --deploy-mode cluster \
+    --class org.apache.spark.examples.HdfsTest \
+    --master k8s://<KUBERNETES_MASTER_ENDPOINT> \
+    --conf spark.executor.instances=1 \
+    --conf spark.app.name=spark-hdfs \
+    --conf spark.kubernetes.container.image=spark:latest \
+    --conf spark.kubernetes.kerberos.tokenSecret.name=<SECRET_TOKEN_NAME> \
+    --conf spark.kubernetes.kerberos.tokenSecret.itemKey=<SECRET_ITEM_KEY> \
+    --conf spark.kubernetes.kerberos.krb5.path=/etc/krb5.conf \
+    local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
+    <HDFS_FILE_LOCATION>
+```
+
+3b. Submitting like in (3) however specifying a pre-created krb5 ConfigMap and 
pre-created `HADOOP_CONF_DIR` ConfigMap
+```bash
+/opt/spark/bin/spark-submit \
+    --deploy-mode cluster \
+    --class org.apache.spark.examples.HdfsTest \
+    --master k8s://<KUBERNETES_MASTER_ENDPOINT> \
+    --conf spark.executor.instances=1 \
+    --conf spark.app.name=spark-hdfs \
+    --conf spark.kubernetes.container.image=spark:latest \
+    --conf spark.kubernetes.kerberos.tokenSecret.name=<SECRET_TOKEN_NAME> \
+    --conf spark.kubernetes.kerberos.tokenSecret.itemKey=<SECRET_ITEM_KEY> \
+    --conf spark.kubernetes.hadoop.configMapName=<HCONF_CONFIG_MAP_NAME> \
+    --conf spark.kubernetes.kerberos.krb5.configMapName=<KRB_CONFIG_MAP_NAME> \
+    local:///opt/spark/examples/jars/spark-examples_<VERSION>.jar \
+    <HDFS_FILE_LOCATION>
+```
 # Event Logging
 
 If your applications are using event logging, the directory where the event 
logs go

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
index e1f985e..08af330 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
@@ -41,6 +41,8 @@ object HdfsTest {
       val end = System.currentTimeMillis()
       println(s"Iteration $iter took ${end-start} ms")
     }
+    println(s"File contents: 
${file.map(_.toString).take(1).mkString(",").slice(0, 10)}")
+    println(s"Returned length(s) of: ${file.map(_.length).sum().toString}")
     spark.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 71e4d32..c2ad80c 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -225,6 +225,43 @@ private[spark] object Config extends Logging {
         "Ensure that major Python version is either Python2 or Python3")
       .createWithDefault("2")
 
+  val KUBERNETES_KERBEROS_KRB5_FILE =
+    ConfigBuilder("spark.kubernetes.kerberos.krb5.path")
+      .doc("Specify the local location of the krb5.conf file to be mounted on 
the driver " +
+        "and executors for Kerberos. Note: The KDC defined needs to be " +
+        "visible from inside the containers ")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_KERBEROS_KRB5_CONFIG_MAP =
+    ConfigBuilder("spark.kubernetes.kerberos.krb5.configMapName")
+      .doc("Specify the name of the ConfigMap, containing the krb5.conf file, 
to be mounted " +
+        "on the driver and executors for Kerberos. Note: The KDC defined" +
+        "needs to be visible from inside the containers ")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_HADOOP_CONF_CONFIG_MAP =
+    ConfigBuilder("spark.kubernetes.hadoop.configMapName")
+      .doc("Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR 
files, " +
+        "to be mounted on the driver and executors for custom Hadoop 
configuration.")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_KERBEROS_DT_SECRET_NAME =
+    ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.name")
+      .doc("Specify the name of the secret where your existing delegation 
tokens are stored. " +
+        "This removes the need for the job user to provide any keytab for 
launching a job")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY =
+    ConfigBuilder("spark.kubernetes.kerberos.tokenSecret.itemKey")
+      .doc("Specify the item key of the data where your existing delegation 
tokens are stored. " +
+        "This removes the need for the job user to provide any keytab for 
launching a job")
+      .stringConf
+      .createOptional
+
   val APP_RESOURCE_TYPE =
     ConfigBuilder("spark.kubernetes.resource.type")
       .doc("This sets the resource type internally")

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 8202d87..172a905 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -60,11 +60,13 @@ private[spark] object Constants {
   val ENV_CLASSPATH = "SPARK_CLASSPATH"
   val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
   val ENV_SPARK_CONF_DIR = "SPARK_CONF_DIR"
+  val ENV_SPARK_USER = "SPARK_USER"
   // Spark app configs for containers
   val SPARK_CONF_VOLUME = "spark-conf-volume"
   val SPARK_CONF_DIR_INTERNAL = "/opt/spark/conf"
   val SPARK_CONF_FILE_NAME = "spark.properties"
   val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
+  val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
 
   // BINDINGS
   val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
@@ -78,4 +80,29 @@ private[spark] object Constants {
   val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc";
   val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
   val MEMORY_OVERHEAD_MIN_MIB = 384L
+
+  // Hadoop Configuration
+  val HADOOP_FILE_VOLUME = "hadoop-properties"
+  val KRB_FILE_VOLUME = "krb5-file"
+  val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
+  val KRB_FILE_DIR_PATH = "/etc"
+  val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
+  val HADOOP_CONFIG_MAP_NAME =
+    "spark.kubernetes.executor.hadoopConfigMapName"
+  val KRB5_CONFIG_MAP_NAME =
+    "spark.kubernetes.executor.krb5ConfigMapName"
+
+  // Kerberos Configuration
+  val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
+  val KERBEROS_DT_SECRET_NAME =
+    "spark.kubernetes.kerberos.dt-secret-name"
+  val KERBEROS_DT_SECRET_KEY =
+    "spark.kubernetes.kerberos.dt-secret-key"
+  val KERBEROS_SPARK_USER_NAME =
+    "spark.kubernetes.kerberos.spark-user-name"
+  val KERBEROS_SECRET_KEY = "hadoop-tokens"
+
+  // Hadoop credentials secrets for the Spark app.
+  val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
+  val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index cae6e7d..3e30ab2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -19,12 +19,15 @@ package org.apache.spark.deploy.k8s
 import scala.collection.mutable
 
 import io.fabric8.kubernetes.api.model.{LocalObjectReference, 
LocalObjectReferenceBuilder, Pod}
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import 
org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config.ConfigEntry
 
 
@@ -47,6 +50,13 @@ private[spark] case class KubernetesExecutorSpecificConf(
     driverPod: Option[Pod])
   extends KubernetesRoleSpecificConf
 
+/*
+ * Structure containing metadata for HADOOP_CONF_DIR customization
+ */
+private[spark] case class HadoopConfSpec(
+    hadoopConfDir: Option[String],
+    hadoopConfigMapName: Option[String])
+
 /**
  * Structure containing metadata for Kubernetes logic to build Spark pods.
  */
@@ -61,7 +71,15 @@ private[spark] case class KubernetesConf[T <: 
KubernetesRoleSpecificConf](
     roleSecretEnvNamesToKeyRefs: Map[String, String],
     roleEnvs: Map[String, String],
     roleVolumes: Iterable[KubernetesVolumeSpec[_ <: 
KubernetesVolumeSpecificConf]],
-    sparkFiles: Seq[String]) {
+    sparkFiles: Seq[String],
+    hadoopConfSpec: Option[HadoopConfSpec]) {
+
+  def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"
+
+  def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"
+
+  def tokenManager(conf: SparkConf, hConf: Configuration): 
KubernetesHadoopDelegationTokenManager =
+    new KubernetesHadoopDelegationTokenManager(new 
HadoopDelegationTokenManager(conf, hConf))
 
   def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
 
@@ -116,7 +134,8 @@ private[spark] object KubernetesConf {
       mainAppResource: Option[MainAppResource],
       mainClass: String,
       appArgs: Array[String],
-      maybePyFiles: Option[String]): 
KubernetesConf[KubernetesDriverSpecificConf] = {
+      maybePyFiles: Option[String],
+      hadoopConfDir: Option[String]): 
KubernetesConf[KubernetesDriverSpecificConf] = {
     val sparkConfWithMainAppJar = sparkConf.clone()
     val additionalFiles = mutable.ArrayBuffer.empty[String]
     mainAppResource.foreach {
@@ -175,6 +194,19 @@ private[spark] object KubernetesConf {
       .map(str => str.split(",").toSeq)
       .getOrElse(Seq.empty[String]) ++ additionalFiles
 
+    val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
+    KubernetesUtils.requireNandDefined(
+      hadoopConfDir,
+      hadoopConfigMapName,
+      "Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap 
" +
+      "as the creation of an additional ConfigMap, when one is already 
specified is extraneous" )
+    val hadoopConfSpec =
+      if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) {
+        Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName))
+      } else {
+        None
+      }
+
     KubernetesConf(
       sparkConfWithMainAppJar,
       KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, 
appArgs),
@@ -186,7 +218,8 @@ private[spark] object KubernetesConf {
       driverSecretEnvNamesToKeyRefs,
       driverEnvs,
       driverVolumes,
-      sparkFiles)
+      sparkFiles,
+      hadoopConfSpec)
   }
 
   def createExecutorConf(
@@ -242,6 +275,7 @@ private[spark] object KubernetesConf {
       executorEnvSecrets,
       executorEnv,
       executorVolumes,
-      Seq.empty[String])
+      Seq.empty[String],
+      None)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index f5fae7c..8f36fa1 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -39,8 +39,27 @@ private[spark] object KubernetesUtils {
     sparkConf.getAllWithPrefix(prefix).toMap
   }
 
+  def requireBothOrNeitherDefined(
+      opt1: Option[_],
+      opt2: Option[_],
+      errMessageWhenFirstIsMissing: String,
+      errMessageWhenSecondIsMissing: String): Unit = {
+    requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
+    requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
+  }
+
+  def requireSecondIfFirstIsDefined(
+      opt1: Option[_],
+      opt2: Option[_],
+      errMessageWhenSecondIsMissing: String): Unit = {
+    opt1.foreach { _ =>
+      require(opt2.isDefined, errMessageWhenSecondIsMissing)
+    }
+  }
+
   def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: 
String): Unit = {
     opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
+    opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
new file mode 100644
index 0000000..fd09de2
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopConfExecutorFeatureStep.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
+import org.apache.spark.internal.Logging
+
+/**
+ * This step is responsible for bootstraping the container with ConfigMaps
+ * containing Hadoop config files mounted as volumes and an ENV variable
+ * pointed to the mounted file directory.
+ */
+private[spark] class HadoopConfExecutorFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep with Logging {
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val sparkConf = kubernetesConf.sparkConf
+    val hadoopConfDirCMapName = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
+    require(hadoopConfDirCMapName.isDefined,
+      "Ensure that the env `HADOOP_CONF_DIR` is defined either in the client 
or " +
+        " using pre-existing ConfigMaps")
+    logInfo("HADOOP_CONF_DIR defined")
+    HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, 
hadoopConfDirCMapName, pod)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
new file mode 100644
index 0000000..5b6a6d5
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/HadoopSparkUserExecutorFeatureStep.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf
+import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
+import org.apache.spark.internal.Logging
+
+/**
+ * This step is responsible for setting ENV_SPARK_USER when HADOOP_FILES are 
detected
+ * however, this step would not be run if Kerberos is enabled, as Kerberos 
sets SPARK_USER
+ */
+private[spark] class HadoopSparkUserExecutorFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep with Logging {
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val sparkUserName = kubernetesConf.sparkConf.get(KERBEROS_SPARK_USER_NAME)
+    HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
new file mode 100644
index 0000000..ce47933
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfDriverFeatureStep.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesDriverSpecificConf
+import org.apache.spark.deploy.k8s.features.hadooputils._
+import org.apache.spark.internal.Logging
+
+/**
+ * Runs the necessary Hadoop-based logic based on Kerberos configs and the 
presence of the
+ * HADOOP_CONF_DIR. This runs various bootstrap methods defined in 
HadoopBootstrapUtil.
+ */
+private[spark] class KerberosConfDriverFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
+  extends KubernetesFeatureConfigStep with Logging {
+
+  require(kubernetesConf.hadoopConfSpec.isDefined,
+     "Ensure that HADOOP_CONF_DIR is defined either via env or a pre-defined 
ConfigMap")
+  private val hadoopConfDirSpec = kubernetesConf.hadoopConfSpec.get
+  private val conf = kubernetesConf.sparkConf
+  private val principal = conf.get(org.apache.spark.internal.config.PRINCIPAL)
+  private val keytab = conf.get(org.apache.spark.internal.config.KEYTAB)
+  private val existingSecretName = conf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME)
+  private val existingSecretItemKey = 
conf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY)
+  private val krb5File = conf.get(KUBERNETES_KERBEROS_KRB5_FILE)
+  private val krb5CMap = conf.get(KUBERNETES_KERBEROS_KRB5_CONFIG_MAP)
+  private val kubeTokenManager = kubernetesConf.tokenManager(conf,
+    SparkHadoopUtil.get.newConfiguration(conf))
+  private val isKerberosEnabled =
+    (hadoopConfDirSpec.hadoopConfDir.isDefined && 
kubeTokenManager.isSecurityEnabled) ||
+      (hadoopConfDirSpec.hadoopConfigMapName.isDefined &&
+        (krb5File.isDefined || krb5CMap.isDefined))
+  require(keytab.isEmpty || isKerberosEnabled,
+    "You must enable Kerberos support if you are specifying a Kerberos Keytab")
+
+  require(existingSecretName.isEmpty || isKerberosEnabled,
+    "You must enable Kerberos support if you are specifying a Kerberos Secret")
+
+  KubernetesUtils.requireNandDefined(
+    krb5File,
+    krb5CMap,
+    "Do not specify both a Krb5 local file and the ConfigMap as the creation " 
+
+       "of an additional ConfigMap, when one is already specified, is 
extraneous")
+
+  KubernetesUtils.requireBothOrNeitherDefined(
+    keytab,
+    principal,
+    "If a Kerberos principal is specified you must also specify a Kerberos 
keytab",
+    "If a Kerberos keytab is specified you must also specify a Kerberos 
principal")
+
+  KubernetesUtils.requireBothOrNeitherDefined(
+    existingSecretName,
+    existingSecretItemKey,
+    "If a secret data item-key where the data of the Kerberos Delegation Token 
is specified" +
+      " you must also specify the name of the secret",
+    "If a secret storing a Kerberos Delegation Token is specified you must 
also" +
+      " specify the item-key where the data is stored")
+
+  private val hadoopConfigurationFiles = hadoopConfDirSpec.hadoopConfDir.map { 
hConfDir =>
+    HadoopBootstrapUtil.getHadoopConfFiles(hConfDir)
+  }
+  private val newHadoopConfigMapName =
+    if (hadoopConfDirSpec.hadoopConfigMapName.isEmpty) {
+      Some(kubernetesConf.hadoopConfigMapName)
+    } else {
+      None
+    }
+
+  // Either use pre-existing secret or login to create new Secret with DT 
stored within
+  private val kerberosConfSpec: Option[KerberosConfigSpec] = (for {
+    secretName <- existingSecretName
+    secretItemKey <- existingSecretItemKey
+  } yield {
+    KerberosConfigSpec(
+      dtSecret = None,
+      dtSecretName = secretName,
+      dtSecretItemKey = secretItemKey,
+      jobUserName = kubeTokenManager.getCurrentUser.getShortUserName)
+  }).orElse(
+    if (isKerberosEnabled) {
+      Some(HadoopKerberosLogin.buildSpec(
+        conf,
+        kubernetesConf.appResourceNamePrefix,
+        kubeTokenManager))
+    } else {
+      None
+    }
+  )
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val hadoopBasedSparkPod = HadoopBootstrapUtil.bootstrapHadoopConfDir(
+      hadoopConfDirSpec.hadoopConfDir,
+      newHadoopConfigMapName,
+      hadoopConfDirSpec.hadoopConfigMapName,
+      pod)
+    kerberosConfSpec.map { hSpec =>
+      HadoopBootstrapUtil.bootstrapKerberosPod(
+        hSpec.dtSecretName,
+        hSpec.dtSecretItemKey,
+        hSpec.jobUserName,
+        krb5File,
+        Some(kubernetesConf.krbConfigMapName),
+        krb5CMap,
+        hadoopBasedSparkPod)
+    }.getOrElse(
+      HadoopBootstrapUtil.bootstrapSparkUserPod(
+        kubeTokenManager.getCurrentUser.getShortUserName,
+        hadoopBasedSparkPod))
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = {
+    val resolvedConfValues = kerberosConfSpec.map { hSpec =>
+      Map(KERBEROS_DT_SECRET_NAME -> hSpec.dtSecretName,
+        KERBEROS_DT_SECRET_KEY -> hSpec.dtSecretItemKey,
+        KERBEROS_SPARK_USER_NAME -> hSpec.jobUserName,
+        KRB5_CONFIG_MAP_NAME -> 
krb5CMap.getOrElse(kubernetesConf.krbConfigMapName))
+      }.getOrElse(
+        Map(KERBEROS_SPARK_USER_NAME ->
+          kubeTokenManager.getCurrentUser.getShortUserName))
+    Map(HADOOP_CONFIG_MAP_NAME ->
+      hadoopConfDirSpec.hadoopConfigMapName.getOrElse(
+      kubernetesConf.hadoopConfigMapName)) ++ resolvedConfValues
+  }
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+    val hadoopConfConfigMap = for {
+      hName <- newHadoopConfigMapName
+      hFiles <- hadoopConfigurationFiles
+    } yield {
+      HadoopBootstrapUtil.buildHadoopConfigMap(hName, hFiles)
+    }
+
+    val krb5ConfigMap = krb5File.map { fileLocation =>
+      HadoopBootstrapUtil.buildkrb5ConfigMap(
+        kubernetesConf.krbConfigMapName,
+        fileLocation)
+    }
+
+    val kerberosDTSecret = kerberosConfSpec.flatMap(_.dtSecret)
+
+    hadoopConfConfigMap.toSeq ++
+      krb5ConfigMap.toSeq ++
+      kerberosDTSecret.toSeq
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
new file mode 100644
index 0000000..06a88b6
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KerberosConfExecutorFeatureStep.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.HasMetadata
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesExecutorSpecificConf
+import org.apache.spark.deploy.k8s.features.hadooputils.HadoopBootstrapUtil
+import org.apache.spark.internal.Logging
+
+/**
+ * This step is responsible for mounting the DT secret for the executors
+ */
+private[spark] class KerberosConfExecutorFeatureStep(
+    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf])
+  extends KubernetesFeatureConfigStep with Logging {
+
+  private val sparkConf = kubernetesConf.sparkConf
+  private val maybeKrb5CMap = sparkConf.getOption(KRB5_CONFIG_MAP_NAME)
+  require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found")
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    logInfo(s"Mounting Resources for Kerberos")
+    HadoopBootstrapUtil.bootstrapKerberosPod(
+      sparkConf.get(KERBEROS_DT_SECRET_NAME),
+      sparkConf.get(KERBEROS_DT_SECRET_KEY),
+      sparkConf.get(KERBEROS_SPARK_USER_NAME),
+      None,
+      None,
+      maybeKrb5CMap,
+      pod)
+  }
+
+  override def getAdditionalPodSystemProperties(): Map[String, String] = 
Map.empty
+
+  override def getAdditionalKubernetesResources(): Seq[HasMetadata] = 
Seq.empty[HasMetadata]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
new file mode 100644
index 0000000..5bee766
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopBootstrapUtil.scala
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.hadooputils
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkPod
+import org.apache.spark.internal.Logging
+
+private[spark] object HadoopBootstrapUtil extends Logging {
+
+  /**
+   * Mounting the DT secret for both the Driver and the executors
+   *
+   * @param dtSecretName Name of the secret that stores the Delegation Token
+   * @param dtSecretItemKey Name of the Item Key storing the Delegation Token
+   * @param userName Name of the SparkUser to set SPARK_USER
+   * @param fileLocation Optional Location of the krb5 file
+   * @param newKrb5ConfName Optional location of the ConfigMap for Krb5
+   * @param existingKrb5ConfName Optional name of ConfigMap for Krb5
+   * @param pod Input pod to be appended to
+   * @return a modified SparkPod
+   */
+  def bootstrapKerberosPod(
+      dtSecretName: String,
+      dtSecretItemKey: String,
+      userName: String,
+      fileLocation: Option[String],
+      newKrb5ConfName: Option[String],
+      existingKrb5ConfName: Option[String],
+      pod: SparkPod): SparkPod = {
+
+    val preConfigMapVolume = existingKrb5ConfName.map { kconf =>
+      new VolumeBuilder()
+        .withName(KRB_FILE_VOLUME)
+        .withNewConfigMap()
+          .withName(kconf)
+          .endConfigMap()
+        .build()
+    }
+
+    val createConfigMapVolume = for {
+      fLocation <- fileLocation
+      krb5ConfName <- newKrb5ConfName
+    } yield {
+      val krb5File = new File(fLocation)
+      val fileStringPath = krb5File.toPath.getFileName.toString
+      new VolumeBuilder()
+        .withName(KRB_FILE_VOLUME)
+        .withNewConfigMap()
+        .withName(krb5ConfName)
+        .withItems(new KeyToPathBuilder()
+          .withKey(fileStringPath)
+          .withPath(fileStringPath)
+          .build())
+        .endConfigMap()
+        .build()
+    }
+
+    // Breaking up Volume creation for clarity
+    val configMapVolume = preConfigMapVolume.orElse(createConfigMapVolume)
+    if (configMapVolume.isEmpty) {
+       logInfo("You have not specified a krb5.conf file locally or via a 
ConfigMap. " +
+         "Make sure that you have the krb5.conf locally on the Driver and 
Executor images")
+    }
+
+    val kerberizedPodWithDTSecret = new PodBuilder(pod.pod)
+      .editOrNewSpec()
+        .addNewVolume()
+          .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+          .withNewSecret()
+            .withSecretName(dtSecretName)
+            .endSecret()
+          .endVolume()
+        .endSpec()
+      .build()
+
+    // Optionally add the krb5.conf ConfigMap
+    val kerberizedPod = configMapVolume.map { cmVolume =>
+      new PodBuilder(kerberizedPodWithDTSecret)
+        .editSpec()
+          .addNewVolumeLike(cmVolume)
+            .endVolume()
+          .endSpec()
+        .build()
+    }.getOrElse(kerberizedPodWithDTSecret)
+
+    val kerberizedContainerWithMounts = new ContainerBuilder(pod.container)
+      .addNewVolumeMount()
+        .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
+        .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
+        .endVolumeMount()
+      .addNewEnv()
+        .withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
+        .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$dtSecretItemKey")
+        .endEnv()
+      .addNewEnv()
+        .withName(ENV_SPARK_USER)
+        .withValue(userName)
+        .endEnv()
+      .build()
+
+    // Optionally add the krb5.conf Volume Mount
+    val kerberizedContainer =
+      if (configMapVolume.isDefined) {
+        new ContainerBuilder(kerberizedContainerWithMounts)
+          .addNewVolumeMount()
+            .withName(KRB_FILE_VOLUME)
+            .withMountPath(KRB_FILE_DIR_PATH + "/krb5.conf")
+            .withSubPath("krb5.conf")
+            .endVolumeMount()
+          .build()
+      } else {
+        kerberizedContainerWithMounts
+      }
+
+    SparkPod(kerberizedPod, kerberizedContainer)
+  }
+
+  /**
+   * setting ENV_SPARK_USER when HADOOP_FILES are detected
+   *
+   * @param sparkUserName Name of the SPARK_USER
+   * @param pod Input pod to be appended to
+   * @return a modified SparkPod
+   */
+  def bootstrapSparkUserPod(sparkUserName: String, pod: SparkPod): SparkPod = {
+    val envModifiedContainer = new ContainerBuilder(pod.container)
+      .addNewEnv()
+        .withName(ENV_SPARK_USER)
+        .withValue(sparkUserName)
+        .endEnv()
+      .build()
+    SparkPod(pod.pod, envModifiedContainer)
+  }
+
+  /**
+   * Grabbing files in the HADOOP_CONF_DIR
+   *
+   * @param path location of HADOOP_CONF_DIR
+   * @return a list of File object
+   */
+  def getHadoopConfFiles(path: String): Seq[File] = {
+    val dir = new File(path)
+    if (dir.isDirectory) {
+      dir.listFiles.filter(_.isFile).toSeq
+    } else {
+      Seq.empty[File]
+    }
+  }
+
+  /**
+   * Bootstraping the container with ConfigMaps that store
+   * Hadoop configuration files
+   *
+   * @param hadoopConfDir directory location of HADOOP_CONF_DIR env
+   * @param newHadoopConfigMapName name of the new configMap for 
HADOOP_CONF_DIR
+   * @param existingHadoopConfigMapName name of the pre-defined configMap for 
HADOOP_CONF_DIR
+   * @param pod Input pod to be appended to
+   * @return a modified SparkPod
+   */
+  def bootstrapHadoopConfDir(
+      hadoopConfDir: Option[String],
+      newHadoopConfigMapName: Option[String],
+      existingHadoopConfigMapName: Option[String],
+      pod: SparkPod): SparkPod = {
+    val preConfigMapVolume = existingHadoopConfigMapName.map { hConf =>
+      new VolumeBuilder()
+        .withName(HADOOP_FILE_VOLUME)
+        .withNewConfigMap()
+          .withName(hConf)
+          .endConfigMap()
+        .build() }
+
+    val createConfigMapVolume = for {
+      dirLocation <- hadoopConfDir
+      hConfName <- newHadoopConfigMapName
+    } yield {
+      val hadoopConfigFiles = getHadoopConfFiles(dirLocation)
+      val keyPaths = hadoopConfigFiles.map { file =>
+        val fileStringPath = file.toPath.getFileName.toString
+        new KeyToPathBuilder()
+          .withKey(fileStringPath)
+          .withPath(fileStringPath)
+          .build()
+      }
+      new VolumeBuilder()
+        .withName(HADOOP_FILE_VOLUME)
+        .withNewConfigMap()
+          .withName(hConfName)
+          .withItems(keyPaths.asJava)
+          .endConfigMap()
+        .build()
+    }
+
+    // Breaking up Volume Creation for clarity
+    val configMapVolume = 
preConfigMapVolume.getOrElse(createConfigMapVolume.get)
+
+    val hadoopSupportedPod = new PodBuilder(pod.pod)
+      .editSpec()
+        .addNewVolumeLike(configMapVolume)
+          .endVolume()
+        .endSpec()
+        .build()
+
+    val hadoopSupportedContainer = new ContainerBuilder(pod.container)
+      .addNewVolumeMount()
+        .withName(HADOOP_FILE_VOLUME)
+        .withMountPath(HADOOP_CONF_DIR_PATH)
+        .endVolumeMount()
+      .addNewEnv()
+        .withName(ENV_HADOOP_CONF_DIR)
+        .withValue(HADOOP_CONF_DIR_PATH)
+        .endEnv()
+      .build()
+    SparkPod(hadoopSupportedPod, hadoopSupportedContainer)
+  }
+
+  /**
+   * Builds ConfigMap given the file location of the
+   * krb5.conf file
+   *
+   * @param configMapName name of configMap for krb5
+   * @param fileLocation location of krb5 file
+   * @return a ConfigMap
+   */
+  def buildkrb5ConfigMap(
+      configMapName: String,
+      fileLocation: String): ConfigMap = {
+    val file = new File(fileLocation)
+    new ConfigMapBuilder()
+      .withNewMetadata()
+        .withName(configMapName)
+        .endMetadata()
+      .addToData(Map(file.toPath.getFileName.toString ->
+        Files.toString(file, StandardCharsets.UTF_8)).asJava)
+      .build()
+  }
+
+  /**
+   * Builds ConfigMap given the ConfigMap name
+   * and a list of Hadoop Conf files
+   *
+   * @param hadoopConfigMapName name of hadoopConfigMap
+   * @param hadoopConfFiles list of hadoopFiles
+   * @return a ConfigMap
+   */
+  def buildHadoopConfigMap(
+      hadoopConfigMapName: String,
+      hadoopConfFiles: Seq[File]): ConfigMap = {
+    new ConfigMapBuilder()
+      .withNewMetadata()
+        .withName(hadoopConfigMapName)
+        .endMetadata()
+      .addToData(hadoopConfFiles.map { file =>
+        (file.toPath.getFileName.toString,
+          Files.toString(file, StandardCharsets.UTF_8))
+        }.toMap.asJava)
+      .build()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
new file mode 100644
index 0000000..67a5849
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/HadoopKerberosLogin.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.hadooputils
+
+import io.fabric8.kubernetes.api.model.SecretBuilder
+import org.apache.commons.codec.binary.Base64
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.k8s.Constants._
+import 
org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
+
+/**
+ * This logic does all the heavy lifting for Delegation Token creation. This 
step
+ * assumes that the job user has either specified a principal and keytab or ran
+ * $kinit before running spark-submit. By running UGI.getCurrentUser we are 
able
+ * to obtain the current user, either signed in via $kinit or keytab. With the
+ * Job User principal you then retrieve the delegation token from the NameNode
+ * and store values in DelegationToken. Lastly, the class puts the data into
+ * a secret. All this is defined in a KerberosConfigSpec.
+ */
+private[spark] object HadoopKerberosLogin {
+  def buildSpec(
+      submissionSparkConf: SparkConf,
+      kubernetesResourceNamePrefix: String,
+      tokenManager: KubernetesHadoopDelegationTokenManager): 
KerberosConfigSpec = {
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf)
+    // The JobUserUGI will be taken fom the Local Ticket Cache or via 
keytab+principal
+    // The login happens in the SparkSubmit so login logic is not necessary to 
include
+    val jobUserUGI = tokenManager.getCurrentUser
+    val originalCredentials = jobUserUGI.getCredentials
+    val (tokenData, renewalInterval) = tokenManager.getDelegationTokens(
+      originalCredentials,
+      submissionSparkConf,
+      hadoopConf)
+    require(tokenData.nonEmpty, "Did not obtain any delegation tokens")
+    val initialTokenDataKeyName = KERBEROS_SECRET_KEY
+    val newSecretName = 
s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME"
+    val secretDT =
+      new SecretBuilder()
+        .withNewMetadata()
+          .withName(newSecretName)
+          .endMetadata()
+        .addToData(initialTokenDataKeyName, 
Base64.encodeBase64String(tokenData))
+        .build()
+    KerberosConfigSpec(
+      dtSecret = Some(secretDT),
+      dtSecretName = newSecretName,
+      dtSecretItemKey = initialTokenDataKeyName,
+      jobUserName = jobUserUGI.getShortUserName)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
new file mode 100644
index 0000000..7f7ef21
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/hadooputils/KerberosConfigSpec.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.hadooputils
+
+import io.fabric8.kubernetes.api.model.Secret
+
+/**
+ * Represents a given configuration of the Kerberos Configuration logic
+ * <p>
+ * - The secret containing a DT, either previously specified or built on the 
fly
+ * - The name of the secret where the DT will be stored
+ * - The data item-key on the secret which correlates with where the current 
DT data is stored
+ * - The Job User's username
+ */
+private[spark] case class KerberosConfigSpec(
+    dtSecret: Option[Secret],
+    dtSecretName: String,
+    dtSecretItemKey: String,
+    jobUserName: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
new file mode 100644
index 0000000..135e2c4
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/security/KubernetesHadoopDelegationTokenManager.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+
+/**
+ * The KubernetesHadoopDelegationTokenManager fetches Hadoop delegation tokens
+ * on the behalf of the Kubernetes submission client. The new credentials
+ * (called Tokens when they are serialized) are stored in Secrets accessible
+ * to the driver and executors, when new Tokens are received they overwrite 
the current Secrets.
+ */
+private[spark] class KubernetesHadoopDelegationTokenManager(
+    tokenManager: HadoopDelegationTokenManager) extends Logging {
+
+  // HadoopUGI Util methods
+  def getCurrentUser: UserGroupInformation = 
UserGroupInformation.getCurrentUser
+  def getShortUserName: String = getCurrentUser.getShortUserName
+  def getFileSystem(hadoopConf: Configuration): FileSystem = 
FileSystem.get(hadoopConf)
+  def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
+  def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): 
UserGroupInformation =
+    UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+  def serializeCreds(creds: Credentials): Array[Byte] = 
SparkHadoopUtil.get.serialize(creds)
+  def nextRT(rt: Long, conf: SparkConf): Long = 
SparkHadoopUtil.nextCredentialRenewalTime(rt, conf)
+
+  def getDelegationTokens(
+      creds: Credentials,
+      conf: SparkConf,
+      hadoopConf: Configuration): (Array[Byte], Long) = {
+    try {
+      val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
+      logDebug(s"Initialized tokens")
+      (serializeCreds(creds), nextRT(rt, conf))
+    } catch {
+      case e: Exception =>
+        logError(s"Failed to fetch Hadoop delegation tokens $e")
+        throw e
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index af3903a..c658756 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 
 import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.KubernetesClient
+import org.apache.hadoop.security.UserGroupInformation
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
@@ -45,7 +46,8 @@ private[spark] case class ClientArguments(
     mainAppResource: Option[MainAppResource],
     mainClass: String,
     driverArgs: Array[String],
-    maybePyFiles: Option[String])
+    maybePyFiles: Option[String],
+    hadoopConfigDir: Option[String])
 
 private[spark] object ClientArguments {
 
@@ -79,7 +81,8 @@ private[spark] object ClientArguments {
       mainAppResource,
       mainClass.get,
       driverArgs.toArray,
-      maybePyFiles)
+      maybePyFiles,
+      sys.env.get(ENV_HADOOP_CONF_DIR))
   }
 }
 
@@ -222,7 +225,8 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
       clientArguments.mainAppResource,
       clientArguments.mainClass,
       clientArguments.driverArgs,
-      clientArguments.maybePyFiles)
+      clientArguments.maybePyFiles,
+      clientArguments.hadoopConfigDir)
     val builder = new KubernetesDriverBuilder
     val namespace = kubernetesConf.namespace()
     // The master URL has been checked for validity already in SparkSubmit.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 8f3f18f..b0b5332 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.deploy.k8s.submit
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, 
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, 
EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, 
MountVolumesFeatureStep}
+import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, 
PythonDriverFeatureStep, RDriverFeatureStep}
 
 private[spark] class KubernetesDriverBuilder(
@@ -51,7 +51,11 @@ private[spark] class KubernetesDriverBuilder(
     provideJavaStep: (
       KubernetesConf[KubernetesDriverSpecificConf]
         => JavaDriverFeatureStep) =
-    new JavaDriverFeatureStep(_)) {
+    new JavaDriverFeatureStep(_),
+    provideHadoopGlobalStep: (
+      KubernetesConf[KubernetesDriverSpecificConf]
+        => KerberosConfDriverFeatureStep) =
+    new KerberosConfDriverFeatureStep(_))  {
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): 
KubernetesDriverSpec = {
@@ -80,8 +84,14 @@ private[spark] class KubernetesDriverBuilder(
           provideRStep(kubernetesConf)}
       .getOrElse(provideJavaStep(kubernetesConf))
 
-    val allFeatures = (baseFeatures :+ bindingsStep) ++
-      secretFeature ++ envSecretFeature ++ volumesFeature
+    val maybeHadoopConfigStep =
+      kubernetesConf.hadoopConfSpec.map { _ =>
+          provideHadoopGlobalStep(kubernetesConf)}
+
+    val allFeatures: Seq[KubernetesFeatureConfigStep] =
+      (baseFeatures :+ bindingsStep) ++
+        secretFeature ++ envSecretFeature ++ volumesFeature ++
+        maybeHadoopConfigStep.toSeq
 
     var spec = 
KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
     for (feature <- allFeatures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 364b6fb..6199a8a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, 
EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
 
 private[spark] class KubernetesExecutorBuilder(
     provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
@@ -35,10 +35,26 @@ private[spark] class KubernetesExecutorBuilder(
       new LocalDirsFeatureStep(_),
     provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
       => MountVolumesFeatureStep) =
-      new MountVolumesFeatureStep(_)) {
+    new MountVolumesFeatureStep(_),
+    provideHadoopConfStep: (
+      KubernetesConf[KubernetesExecutorSpecificConf]
+        => HadoopConfExecutorFeatureStep) =
+    new HadoopConfExecutorFeatureStep(_),
+    provideKerberosConfStep: (
+      KubernetesConf[KubernetesExecutorSpecificConf]
+        => KerberosConfExecutorFeatureStep) =
+    new KerberosConfExecutorFeatureStep(_),
+    provideHadoopSparkUserStep: (
+      KubernetesConf[KubernetesExecutorSpecificConf]
+        => HadoopSparkUserExecutorFeatureStep) =
+    new HadoopSparkUserExecutorFeatureStep(_)) {
 
   def buildFromFeatures(
     kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod 
= {
+    val sparkConf = kubernetesConf.sparkConf
+    val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
+    val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
+    val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)
 
     val baseFeatures = Seq(provideBasicStep(kubernetesConf), 
provideLocalDirsStep(kubernetesConf))
     val secretFeature = if 
(kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
@@ -51,7 +67,23 @@ private[spark] class KubernetesExecutorBuilder(
       Seq(provideVolumesStep(kubernetesConf))
     } else Nil
 
-    val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ 
volumesFeature
+    val maybeHadoopConfFeatureSteps = maybeHadoopConfigMap.map { _ =>
+      val maybeKerberosStep =
+        if (maybeDTSecretName.isDefined && maybeDTDataItem.isDefined) {
+          provideKerberosConfStep(kubernetesConf)
+        } else {
+          provideHadoopSparkUserStep(kubernetesConf)
+        }
+      Seq(provideHadoopConfStep(kubernetesConf)) :+
+        maybeKerberosStep
+    }.getOrElse(Seq.empty[KubernetesFeatureConfigStep])
+
+    val allFeatures: Seq[KubernetesFeatureConfigStep] =
+      baseFeatures ++
+      secretFeature ++
+      secretEnvFeature ++
+      volumesFeature ++
+      maybeHadoopConfFeatureSteps
 
     var executorPod = SparkPod.initialPod()
     for (feature <- allFeatures) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index e3c19cd..bb2b94f 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -59,7 +59,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       mainAppResource = None,
       MAIN_CLASS,
       APP_ARGS,
-      maybePyFiles = None)
+      maybePyFiles = None,
+      hadoopConfDir = None)
     assert(conf.appId === APP_ID)
     assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
     assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
@@ -81,7 +82,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       mainAppJar,
       MAIN_CLASS,
       APP_ARGS,
-      maybePyFiles = None)
+      maybePyFiles = None,
+      hadoopConfDir = None)
     assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars")
       .split(",")
       === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar"))
@@ -93,7 +95,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       mainAppResource = None,
       MAIN_CLASS,
       APP_ARGS,
-      maybePyFiles = None)
+      maybePyFiles = None,
+      hadoopConfDir = None)
     assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",")
       === Array("local:///opt/spark/jar1.jar"))
     assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) 
=== 0.1)
@@ -114,7 +117,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       mainAppResource,
       MAIN_CLASS,
       APP_ARGS,
-      Some(inputPyFiles.mkString(",")))
+      Some(inputPyFiles.mkString(",")),
+      hadoopConfDir = None)
     
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
       === Array("local:///opt/spark/jar1.jar"))
     
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 
0.4)
@@ -136,7 +140,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       mainAppResource,
       MAIN_CLASS,
       APP_ARGS,
-      maybePyFiles = None)
+      maybePyFiles = None,
+      hadoopConfDir = None)
     
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
       === Array("local:///opt/spark/jar1.jar"))
     
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 
0.4)
@@ -158,7 +163,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       mainAppResource,
       MAIN_CLASS,
       APP_ARGS,
-      None)
+      maybePyFiles = None,
+      hadoopConfDir = None)
     assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
   }
 
@@ -189,7 +195,8 @@ class KubernetesConfSuite extends SparkFunSuite {
       mainAppResource = None,
       MAIN_CLASS,
       APP_ARGS,
-      maybePyFiles = None)
+      maybePyFiles = None,
+      hadoopConfDir = None)
     assert(conf.roleLabels === Map(
       SPARK_APP_ID_LABEL -> APP_ID,
       SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 0968cce..eebdd15 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -77,7 +77,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
 
     val featureStep = new BasicDriverFeatureStep(kubernetesConf)
     val basePod = SparkPod.initialPod()
@@ -139,7 +140,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(CONTAINER_IMAGE, "spark-driver:latest")
     val pythonSparkConf = new SparkConf()
       .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g")
-      .set(CONTAINER_IMAGE, "spark-driver:latest")
+      .set(CONTAINER_IMAGE, "spark-driver-py:latest")
     val javaKubernetesConf = KubernetesConf(
       javaSparkConf,
       KubernetesDriverSpecificConf(
@@ -155,7 +156,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
+
     val pythonKubernetesConf = KubernetesConf(
       pythonSparkConf,
       KubernetesDriverSpecificConf(
@@ -171,12 +174,15 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
     val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
     val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
     val basePod = SparkPod.initialPod()
     val configuredJavaPod = javaFeatureStep.configurePod(basePod)
     val configuredPythonPod = pythonFeatureStep.configurePod(basePod)
+    assert(configuredJavaPod.container.getImage === "spark-driver:latest")
+    assert(configuredPythonPod.container.getImage === "spark-driver-py:latest")
   }
 
   test("Additional system properties resolve jars and set cluster-mode 
confs.") {
@@ -198,7 +204,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       DRIVER_ENVS,
       Nil,
-      allFiles)
+      allFiles,
+      hadoopConfSpec = None)
 
     val step = new BasicDriverFeatureStep(kubernetesConf)
     val additionalProperties = step.getAdditionalPodSystemProperties()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 63b237b..41f34bd 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -91,7 +91,8 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String]))
+        Seq.empty[String],
+        hadoopConfSpec = None))
     val executor = step.configurePod(SparkPod.initialPod())
 
     // The executor pod name and default labels.
@@ -131,7 +132,8 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String]))
+        Seq.empty[String],
+        hadoopConfSpec = None))
     
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length 
=== 63)
   }
 
@@ -152,7 +154,8 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map("qux" -> "quux"),
         Nil,
-        Seq.empty[String]))
+        Seq.empty[String],
+        hadoopConfSpec = None))
     val executor = step.configurePod(SparkPod.initialPod())
 
     checkEnv(executor,
@@ -179,7 +182,8 @@ class BasicExecutorFeatureStepSuite
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String]))
+        Seq.empty[String],
+        hadoopConfSpec = None))
     val executor = step.configurePod(SparkPod.initialPod())
     // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
     assert(executor.container.getResources.getRequests.get("memory").getAmount 
=== "1450Mi")

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index 7e916b3..8675ceb 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -62,7 +62,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === 
BASE_DRIVER_POD)
     
assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
@@ -94,7 +95,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
 
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === 
BASE_DRIVER_POD)
@@ -133,7 +135,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     val resolvedProperties = 
kubernetesCredentialsStep.getAdditionalPodSystemProperties()
     val expectedSparkConf = Map(

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 8b91e93..5c3e801 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -68,7 +68,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String]))
+        Seq.empty[String],
+        hadoopConfSpec = None))
     assert(configurationStep.configurePod(SparkPod.initialPod()) === 
SparkPod.initialPod())
     assert(configurationStep.getAdditionalKubernetesResources().size === 1)
     
assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
@@ -100,7 +101,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String]))
+        Seq.empty[String],
+        hadoopConfSpec = None))
     val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
       DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
     val expectedHostName = s"$expectedServiceName.my-namespace.svc"
@@ -122,7 +124,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String]))
+        Seq.empty[String],
+        hadoopConfSpec = None))
     val resolvedService = configurationStep
       .getAdditionalKubernetesResources()
       .head
@@ -153,7 +156,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
         Map.empty,
         Map.empty,
         Nil,
-        Seq.empty[String]),
+        Seq.empty[String],
+        hadoopConfSpec = None),
       clock)
     val driverService = configurationStep
       .getAdditionalKubernetesResources()
@@ -181,7 +185,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Nil,
-          Seq.empty[String]),
+          Seq.empty[String],
+          hadoopConfSpec = None),
         clock)
       fail("The driver bind address should not be allowed.")
     } catch {
@@ -207,7 +212,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
           Map.empty,
           Map.empty,
           Nil,
-          Seq.empty[String]),
+          Seq.empty[String],
+          hadoopConfSpec = None),
         clock)
       fail("The driver host address should not be allowed.")
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
index 85c6cb2..43796b7 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -46,7 +46,8 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
       envVarsToKeys,
       Map.empty,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
 
     val step = new EnvSecretsFeatureStep(kubernetesConf)
     val driverContainerWithEnvSecrets = 
step.configurePod(baseDriverPod).container

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index acdd07b..3a4e605 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -48,7 +48,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with 
BeforeAndAfter {
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
   }
 
   test("Resolve to default local dir if neither env nor configuration are 
set") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index dad610c..18e3d77 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -44,7 +44,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
       Map.empty,
       Map.empty,
       Nil,
-      Seq.empty[String])
+      Seq.empty[String],
+      hadoopConfSpec = None)
 
     val step = new MountSecretsFeatureStep(kubernetesConf)
     val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index d309aa9..0d0a5fb 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -36,7 +36,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
     roleSecretEnvNamesToKeyRefs = Map.empty,
     roleEnvs = Map.empty,
     roleVolumes = Nil,
-    sparkFiles = Nil)
+    sparkFiles = Nil,
+    hadoopConfSpec = None)
 
   test("Mounts hostPath volumes") {
     val volumeConf = KubernetesVolumeSpec(

http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
index bf552ae..9172e0c 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala
@@ -43,7 +43,8 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
       roleSecretEnvNamesToKeyRefs = Map.empty,
       roleEnvs = Map.empty,
       roleVolumes = Nil,
-      sparkFiles = Seq.empty[String])
+      sparkFiles = Seq.empty[String],
+      hadoopConfSpec = None)
 
     val step = new JavaDriverFeatureStep(kubernetesConf)
     val driverPod = step.configurePod(baseDriverPod).pod


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to