http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 167fb40..a5ad972 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -21,57 +21,46 @@ import java.io.File
 import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.SparkConf
-import org.apache.spark.deploy.k8s.{Config, KubernetesConf, 
KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, 
KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.features._
 
 private[spark] class KubernetesDriverBuilder(
-    provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => 
BasicDriverFeatureStep =
+    provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) =
       new BasicDriverFeatureStep(_),
-    provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf])
-      => DriverKubernetesCredentialsFeatureStep =
+    provideCredentialsStep: (KubernetesDriverConf => 
DriverKubernetesCredentialsFeatureStep) =
       new DriverKubernetesCredentialsFeatureStep(_),
-    provideServiceStep: (KubernetesConf[KubernetesDriverSpecificConf]) => 
DriverServiceFeatureStep =
+    provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) =
       new DriverServiceFeatureStep(_),
-    provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
-      => MountSecretsFeatureStep) =
+    provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
       new MountSecretsFeatureStep(_),
-    provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
-      => EnvSecretsFeatureStep) =
+    provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
       new EnvSecretsFeatureStep(_),
-    provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
-      => LocalDirsFeatureStep =
+    provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
       new LocalDirsFeatureStep(_),
-    provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
-      => MountVolumesFeatureStep) =
+    provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
       new MountVolumesFeatureStep(_),
-    provideDriverCommandStep: (
-      KubernetesConf[KubernetesDriverSpecificConf]
-      => DriverCommandFeatureStep) =
+    provideDriverCommandStep: (KubernetesDriverConf => 
DriverCommandFeatureStep) =
       new DriverCommandFeatureStep(_),
-    provideHadoopGlobalStep: (
-      KubernetesConf[KubernetesDriverSpecificConf]
-        => KerberosConfDriverFeatureStep) =
-    new KerberosConfDriverFeatureStep(_),
-    providePodTemplateConfigMapStep: (KubernetesConf[_ <: 
KubernetesRoleSpecificConf]
-      => PodTemplateConfigMapStep) =
-    new PodTemplateConfigMapStep(_),
-    provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
+    provideHadoopGlobalStep: (KubernetesDriverConf => 
KerberosConfDriverFeatureStep) =
+      new KerberosConfDriverFeatureStep(_),
+    providePodTemplateConfigMapStep: (KubernetesConf => 
PodTemplateConfigMapStep) =
+      new PodTemplateConfigMapStep(_),
+    provideInitialPod: () => SparkPod = () => SparkPod.initialPod) {
 
-  def buildFromFeatures(
-    kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): 
KubernetesDriverSpec = {
+  def buildFromFeatures(kubernetesConf: KubernetesDriverConf): 
KubernetesDriverSpec = {
     val baseFeatures = Seq(
       provideBasicStep(kubernetesConf),
       provideCredentialsStep(kubernetesConf),
       provideServiceStep(kubernetesConf),
       provideLocalDirsStep(kubernetesConf))
 
-    val secretFeature = if 
(kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+    val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
       Seq(provideSecretsStep(kubernetesConf))
     } else Nil
-    val envSecretFeature = if 
(kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+    val envSecretFeature = if 
(kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
       Seq(provideEnvSecretsStep(kubernetesConf))
     } else Nil
-    val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
+    val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
       Seq(provideVolumesStep(kubernetesConf))
     } else Nil
     val podTemplateFeature = if (
@@ -81,14 +70,12 @@ private[spark] class KubernetesDriverBuilder(
 
     val driverCommandStep = provideDriverCommandStep(kubernetesConf)
 
-    val maybeHadoopConfigStep =
-      kubernetesConf.hadoopConfSpec.map { _ =>
-        provideHadoopGlobalStep(kubernetesConf)}
+    val hadoopConfigStep = Some(provideHadoopGlobalStep(kubernetesConf))
 
     val allFeatures: Seq[KubernetesFeatureConfigStep] =
       baseFeatures ++ Seq(driverCommandStep) ++
         secretFeature ++ envSecretFeature ++ volumesFeature ++
-        maybeHadoopConfigStep.toSeq ++ podTemplateFeature
+        hadoopConfigStep ++ podTemplateFeature
 
     var spec = KubernetesDriverSpec(
       provideInitialPod(),

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index fc41a47..d24ff0d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -26,50 +26,38 @@ import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.features._
 
 private[spark] class KubernetesExecutorBuilder(
-    provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
-      => BasicExecutorFeatureStep =
+    provideBasicStep: (KubernetesExecutorConf => BasicExecutorFeatureStep) =
       new BasicExecutorFeatureStep(_),
-    provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
-      => MountSecretsFeatureStep =
+    provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
       new MountSecretsFeatureStep(_),
-    provideEnvSecretsStep:
-      (KubernetesConf[_ <: KubernetesRoleSpecificConf] => 
EnvSecretsFeatureStep) =
+    provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
       new EnvSecretsFeatureStep(_),
-    provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
-      => LocalDirsFeatureStep =
+    provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
       new LocalDirsFeatureStep(_),
-    provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
-      => MountVolumesFeatureStep) =
+    provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
       new MountVolumesFeatureStep(_),
-    provideHadoopConfStep: (
-      KubernetesConf[KubernetesExecutorSpecificConf]
-      => HadoopConfExecutorFeatureStep) =
+    provideHadoopConfStep: (KubernetesExecutorConf => 
HadoopConfExecutorFeatureStep) =
       new HadoopConfExecutorFeatureStep(_),
-    provideKerberosConfStep: (
-      KubernetesConf[KubernetesExecutorSpecificConf]
-      => KerberosConfExecutorFeatureStep) =
+    provideKerberosConfStep: (KubernetesExecutorConf => 
KerberosConfExecutorFeatureStep) =
       new KerberosConfExecutorFeatureStep(_),
-    provideHadoopSparkUserStep: (
-      KubernetesConf[KubernetesExecutorSpecificConf]
-      => HadoopSparkUserExecutorFeatureStep) =
+    provideHadoopSparkUserStep: (KubernetesExecutorConf => 
HadoopSparkUserExecutorFeatureStep) =
       new HadoopSparkUserExecutorFeatureStep(_),
     provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
 
-  def buildFromFeatures(
-    kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod 
= {
+  def buildFromFeatures(kubernetesConf: KubernetesExecutorConf): SparkPod = {
     val sparkConf = kubernetesConf.sparkConf
     val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
     val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
     val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)
 
     val baseFeatures = Seq(provideBasicStep(kubernetesConf), 
provideLocalDirsStep(kubernetesConf))
-    val secretFeature = if 
(kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
+    val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
       Seq(provideSecretsStep(kubernetesConf))
     } else Nil
-    val secretEnvFeature = if 
(kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
+    val secretEnvFeature = if 
(kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
       Seq(provideEnvSecretsStep(kubernetesConf))
     } else Nil
-    val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
+    val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
       Seq(provideVolumesStep(kubernetesConf))
     } else Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 41ca8d1..f4d40b0 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -26,10 +26,6 @@ import org.apache.spark.deploy.k8s.submit._
 
 class KubernetesConfSuite extends SparkFunSuite {
 
-  private val APP_NAME = "test-app"
-  private val RESOURCE_NAME_PREFIX = "prefix"
-  private val APP_ID = "test-id"
-  private val MAIN_CLASS = "test-class"
   private val APP_ARGS = Array("arg1", "arg2")
   private val CUSTOM_LABELS = Map(
     "customLabel1Key" -> "customLabel1Value",
@@ -49,26 +45,6 @@ class KubernetesConfSuite extends SparkFunSuite {
   private val DRIVER_POD = new PodBuilder().build()
   private val EXECUTOR_ID = "executor-id"
 
-  test("Basic driver translated fields.") {
-    val sparkConf = new SparkConf(false)
-    val conf = KubernetesConf.createDriverConf(
-      sparkConf,
-      APP_NAME,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      mainAppResource = JavaMainAppResource(None),
-      MAIN_CLASS,
-      APP_ARGS,
-      maybePyFiles = None,
-      hadoopConfDir = None)
-    assert(conf.appId === APP_ID)
-    assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap)
-    assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX)
-    assert(conf.roleSpecificConf.appName === APP_NAME)
-    assert(conf.roleSpecificConf.mainClass === MAIN_CLASS)
-    assert(conf.roleSpecificConf.appArgs === APP_ARGS)
-  }
-
   test("Resolve driver labels, annotations, secret mount paths, envs, and 
memory overhead") {
     val sparkConf = new SparkConf(false)
       .set(MEMORY_OVERHEAD_FACTOR, 0.3)
@@ -90,22 +66,19 @@ class KubernetesConfSuite extends SparkFunSuite {
 
     val conf = KubernetesConf.createDriverConf(
       sparkConf,
-      APP_NAME,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      mainAppResource = JavaMainAppResource(None),
-      MAIN_CLASS,
+      KubernetesTestConf.APP_ID,
+      JavaMainAppResource(None),
+      KubernetesTestConf.MAIN_CLASS,
       APP_ARGS,
-      maybePyFiles = None,
-      hadoopConfDir = None)
-    assert(conf.roleLabels === Map(
-      SPARK_APP_ID_LABEL -> APP_ID,
+      None)
+    assert(conf.labels === Map(
+      SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
       SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
       CUSTOM_LABELS)
-    assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
-    assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
-    assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
-    assert(conf.roleEnvs === CUSTOM_ENVS)
+    assert(conf.annotations === CUSTOM_ANNOTATIONS)
+    assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+    assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)
+    assert(conf.environment === CUSTOM_ENVS)
     assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3)
   }
 
@@ -113,20 +86,20 @@ class KubernetesConfSuite extends SparkFunSuite {
     val conf = KubernetesConf.createExecutorConf(
       new SparkConf(false),
       EXECUTOR_ID,
-      APP_ID,
+      KubernetesTestConf.APP_ID,
       Some(DRIVER_POD))
-    assert(conf.roleSpecificConf.executorId === EXECUTOR_ID)
-    assert(conf.roleSpecificConf.driverPod.get === DRIVER_POD)
+    assert(conf.executorId === EXECUTOR_ID)
+    assert(conf.driverPod.get === DRIVER_POD)
   }
 
   test("Image pull secrets.") {
     val conf = KubernetesConf.createExecutorConf(
       new SparkConf(false)
-        .set(IMAGE_PULL_SECRETS, "my-secret-1,my-secret-2 "),
+        .set(IMAGE_PULL_SECRETS, Seq("my-secret-1", "my-secret-2 ")),
       EXECUTOR_ID,
-      APP_ID,
+      KubernetesTestConf.APP_ID,
       Some(DRIVER_POD))
-    assert(conf.imagePullSecrets() ===
+    assert(conf.imagePullSecrets ===
       Seq(
         new LocalObjectReferenceBuilder().withName("my-secret-1").build(),
         new LocalObjectReferenceBuilder().withName("my-secret-2").build()))
@@ -150,14 +123,14 @@ class KubernetesConfSuite extends SparkFunSuite {
     val conf = KubernetesConf.createExecutorConf(
       sparkConf,
       EXECUTOR_ID,
-      APP_ID,
+      KubernetesTestConf.APP_ID,
       Some(DRIVER_POD))
-    assert(conf.roleLabels === Map(
+    assert(conf.labels === Map(
       SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID,
-      SPARK_APP_ID_LABEL -> APP_ID,
+      SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
       SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
-    assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
-    assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
-    assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
+    assert(conf.annotations === CUSTOM_ANNOTATIONS)
+    assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
+    assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
new file mode 100644
index 0000000..1d77a6d
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+
+/**
+ * Builder methods for KubernetesConf that allow easy control over what to 
return for a few
+ * properties. For use with tests instead of having to mock specific 
properties.
+ */
+object KubernetesTestConf {
+
+  val APP_ID = "appId"
+  val MAIN_CLASS = "mainClass"
+  val RESOURCE_PREFIX = "prefix"
+  val EXECUTOR_ID = "1"
+
+  private val DEFAULT_CONF = new SparkConf(false)
+
+  // scalastyle:off argcount
+  def createDriverConf(
+      sparkConf: SparkConf = DEFAULT_CONF,
+      appId: String = APP_ID,
+      mainAppResource: MainAppResource = JavaMainAppResource(None),
+      mainClass: String = MAIN_CLASS,
+      appArgs: Array[String] = Array.empty,
+      pyFiles: Seq[String] = Nil,
+      resourceNamePrefix: Option[String] = None,
+      labels: Map[String, String] = Map.empty,
+      environment: Map[String, String] = Map.empty,
+      annotations: Map[String, String] = Map.empty,
+      secretEnvNamesToKeyRefs: Map[String, String] = Map.empty,
+      secretNamesToMountPaths: Map[String, String] = Map.empty,
+      volumes: Seq[KubernetesVolumeSpec] = Seq.empty): KubernetesDriverConf = {
+    val conf = sparkConf.clone()
+
+    resourceNamePrefix.foreach { prefix =>
+      conf.set(KUBERNETES_DRIVER_POD_NAME_PREFIX, prefix)
+    }
+    setPrefixedConfigs(conf, KUBERNETES_DRIVER_LABEL_PREFIX, labels)
+    setPrefixedConfigs(conf, KUBERNETES_DRIVER_ENV_PREFIX, environment)
+    setPrefixedConfigs(conf, KUBERNETES_DRIVER_ANNOTATION_PREFIX, annotations)
+    setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRETS_PREFIX, 
secretNamesToMountPaths)
+    setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, 
secretEnvNamesToKeyRefs)
+    setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes)
+
+    new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, 
pyFiles)
+  }
+  // scalastyle:on argcount
+
+  def createExecutorConf(
+      sparkConf: SparkConf = DEFAULT_CONF,
+      driverPod: Option[Pod] = None,
+      labels: Map[String, String] = Map.empty,
+      environment: Map[String, String] = Map.empty,
+      annotations: Map[String, String] = Map.empty,
+      secretEnvNamesToKeyRefs: Map[String, String] = Map.empty,
+      secretNamesToMountPaths: Map[String, String] = Map.empty,
+      volumes: Seq[KubernetesVolumeSpec] = Seq.empty): KubernetesExecutorConf 
= {
+    val conf = sparkConf.clone()
+
+    setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_LABEL_PREFIX, labels)
+    setPrefixedConfigs(conf, "spark.executorEnv.", environment)
+    setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, 
annotations)
+    setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX, 
secretNamesToMountPaths)
+    setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX, 
secretEnvNamesToKeyRefs)
+    setVolumeSpecs(conf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX, volumes)
+
+    new KubernetesExecutorConf(conf, APP_ID, EXECUTOR_ID, driverPod)
+  }
+
+  private def setPrefixedConfigs(
+      conf: SparkConf,
+      prefix: String,
+      values: Map[String, String]): Unit = {
+    values.foreach { case (k, v) =>
+      conf.set(s"${prefix}$k", v)
+    }
+  }
+
+  private def setVolumeSpecs(
+      conf: SparkConf,
+      prefix: String,
+      volumes: Seq[KubernetesVolumeSpec]): Unit = {
+    def key(vtype: String, vname: String, subkey: String): String = {
+      s"${prefix}$vtype.$vname.$subkey"
+    }
+
+    volumes.foreach { case spec =>
+      val (vtype, configs) = spec.volumeConf match {
+        case KubernetesHostPathVolumeConf(path) =>
+          (KUBERNETES_VOLUMES_HOSTPATH_TYPE,
+            Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path))
+
+        case KubernetesPVCVolumeConf(claimName) =>
+          (KUBERNETES_VOLUMES_PVC_TYPE,
+            Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName))
+
+        case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
+          val mconf = medium.map { m => 
(KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap
+          val lconf = sizeLimit.map { l => 
(KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap
+          (KUBERNETES_VOLUMES_EMPTYDIR_TYPE, mconf ++ lconf)
+      }
+
+      conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_PATH_KEY), 
spec.mountPath)
+      if (spec.mountSubPath.nonEmpty) {
+        conf.set(key(vtype, spec.volumeName, 
KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY),
+          spec.mountSubPath)
+      }
+      conf.set(key(vtype, spec.volumeName, 
KUBERNETES_VOLUMES_MOUNT_READONLY_KEY),
+        spec.mountReadOnly.toString)
+      configs.foreach { case (k, v) =>
+        conf.set(key(vtype, spec.volumeName, k), v)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
index de79a58..c079089 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala
@@ -25,7 +25,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true")
     sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head.get
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
     assert(volumeSpec.volumeName === "volumeName")
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountReadOnly === true)
@@ -39,7 +39,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
     sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head.get
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
     assert(volumeSpec.volumeName === "volumeName")
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountSubPath === "subPath")
@@ -51,7 +51,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", 
"true")
     sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", 
"claimeName")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head.get
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
     assert(volumeSpec.volumeName === "volumeName")
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountReadOnly === true)
@@ -66,7 +66,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     sparkConf.set("test.emptyDir.volumeName.options.medium", "medium")
     sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5G")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head.get
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
     assert(volumeSpec.volumeName === "volumeName")
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountReadOnly === true)
@@ -79,7 +79,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
     sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head.get
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
     assert(volumeSpec.volumeName === "volumeName")
     assert(volumeSpec.mountPath === "/path")
     assert(volumeSpec.mountReadOnly === true)
@@ -92,27 +92,29 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
     sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
     sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head.get
+    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
     assert(volumeSpec.mountReadOnly === false)
   }
 
-  test("Gracefully fails on missing mount key") {
+  test("Fails on missing mount key") {
     val sparkConf = new SparkConf(false)
     sparkConf.set("test.emptyDir.volumeName.mnt.path", "/path")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
-    assert(volumeSpec.isFailure === true)
-    assert(volumeSpec.failed.get.getMessage === 
"emptyDir.volumeName.mount.path")
+    val e = intercept[NoSuchElementException] {
+      KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
+    }
+    assert(e.getMessage.contains("emptyDir.volumeName.mount.path"))
   }
 
-  test("Gracefully fails on missing option key") {
+  test("Fails on missing option key") {
     val sparkConf = new SparkConf(false)
     sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
     sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true")
     sparkConf.set("test.hostPath.volumeName.options.pth", "/hostPath")
 
-    val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, 
"test.").head
-    assert(volumeSpec.isFailure === true)
-    assert(volumeSpec.failed.get.getMessage === 
"hostPath.volumeName.options.path")
+    val e = intercept[NoSuchElementException] {
+      KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
+    }
+    assert(e.getMessage.contains("hostPath.volumeName.options.path"))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 1e7dfbe..e4951bc 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, 
LocalObjectReferenceBuilder}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
@@ -30,32 +30,17 @@ import org.apache.spark.ui.SparkUI
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
-  private val APP_ID = "spark-app-id"
-  private val RESOURCE_NAME_PREFIX = "spark"
   private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
   private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
-  private val APP_NAME = "spark-test"
-  private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
-  private val PY_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
-  private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
-  private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
-  private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
-  private val DRIVER_ANNOTATIONS = Map(CUSTOM_ANNOTATION_KEY -> 
CUSTOM_ANNOTATION_VALUE)
-  private val DRIVER_CUSTOM_ENV1 = "customDriverEnv1"
-  private val DRIVER_CUSTOM_ENV2 = "customDriverEnv2"
+  private val DRIVER_ANNOTATIONS = Map("customAnnotation" -> 
"customAnnotationValue")
   private val DRIVER_ENVS = Map(
-    DRIVER_CUSTOM_ENV1 -> DRIVER_CUSTOM_ENV1,
-    DRIVER_CUSTOM_ENV2 -> DRIVER_CUSTOM_ENV2)
+    "customDriverEnv1" -> "customDriverEnv2",
+    "customDriverEnv2" -> "customDriverEnv2")
   private val TEST_IMAGE_PULL_SECRETS = Seq("my-secret-1", "my-secret-2")
   private val TEST_IMAGE_PULL_SECRET_OBJECTS =
     TEST_IMAGE_PULL_SECRETS.map { secret =>
       new LocalObjectReferenceBuilder().withName(secret).build()
     }
-  private val emptyDriverSpecificConf = KubernetesDriverSpecificConf(
-    JavaMainAppResource(None),
-    APP_NAME,
-    MAIN_CLASS,
-    APP_ARGS)
 
   test("Check the pod respects all configurations from the user.") {
     val sparkConf = new SparkConf()
@@ -65,19 +50,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(DRIVER_MEMORY.key, "256M")
       .set(DRIVER_MEMORY_OVERHEAD, 200L)
       .set(CONTAINER_IMAGE, "spark-driver:latest")
-      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
-    val kubernetesConf = KubernetesConf(
-      sparkConf,
-      emptyDriverSpecificConf,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      DRIVER_LABELS,
-      DRIVER_ANNOTATIONS,
-      Map.empty,
-      Map.empty,
-      DRIVER_ENVS,
-      Nil,
-      hadoopConfSpec = None)
+      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = DRIVER_LABELS,
+      environment = DRIVER_ENVS,
+      annotations = DRIVER_ANNOTATIONS)
 
     val featureStep = new BasicDriverFeatureStep(kubernetesConf)
     val basePod = SparkPod.initialPod()
@@ -99,10 +77,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val envs = configuredPod.container
       .getEnv
       .asScala
-      .map(env => (env.getName, env.getValue))
+      .map { env => (env.getName, env.getValue) }
       .toMap
-    assert(envs(DRIVER_CUSTOM_ENV1) === DRIVER_ENVS(DRIVER_CUSTOM_ENV1))
-    assert(envs(DRIVER_CUSTOM_ENV2) === DRIVER_ENVS(DRIVER_CUSTOM_ENV2))
+    DRIVER_ENVS.foreach { case (k, v) =>
+      assert(envs(v) === v)
+    }
 
     assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala ===
       TEST_IMAGE_PULL_SECRET_OBJECTS)
@@ -122,13 +101,15 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
 
     val driverPodMetadata = configuredPod.pod.getMetadata
     assert(driverPodMetadata.getName === "spark-driver-pod")
-    assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
+    DRIVER_LABELS.foreach { case (k, v) =>
+      assert(driverPodMetadata.getLabels.get(k) === v)
+    }
     assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS)
     assert(configuredPod.pod.getSpec.getRestartPolicy === "Never")
     val expectedSparkConf = Map(
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
-      "spark.app.id" -> APP_ID,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+      "spark.app.id" -> KubernetesTestConf.APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> 
kubernetesConf.resourceNamePrefix,
       "spark.kubernetes.submitInDriver" -> "true",
       MEMORY_OVERHEAD_FACTOR.key -> 
MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
     assert(featureStep.getAdditionalPodSystemProperties() === 
expectedSparkConf)
@@ -141,39 +122,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     val pythonSparkConf = new SparkConf()
       .set(DRIVER_MEMORY.key, "4g")
       .set(CONTAINER_IMAGE, "spark-driver-py:latest")
-    val javaKubernetesConf = KubernetesConf(
-      javaSparkConf,
-      KubernetesDriverSpecificConf(
-        JavaMainAppResource(None),
-        APP_NAME,
-        PY_MAIN_CLASS,
-        APP_ARGS),
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      DRIVER_LABELS,
-      DRIVER_ANNOTATIONS,
-      Map.empty,
-      Map.empty,
-      DRIVER_ENVS,
-      Nil,
-      hadoopConfSpec = None)
-
-    val pythonKubernetesConf = KubernetesConf(
-      pythonSparkConf,
-      KubernetesDriverSpecificConf(
-        PythonMainAppResource(""),
-        APP_NAME,
-        PY_MAIN_CLASS,
-        APP_ARGS),
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      DRIVER_LABELS,
-      DRIVER_ANNOTATIONS,
-      Map.empty,
-      Map.empty,
-      DRIVER_ENVS,
-      Nil,
-      hadoopConfSpec = None)
+    val javaKubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
javaSparkConf)
+    val pythonKubernetesConf = KubernetesTestConf.createDriverConf(
+      sparkConf = pythonSparkConf,
+      mainAppResource = PythonMainAppResource(""))
     val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf)
     val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf)
     val basePod = SparkPod.initialPod()
@@ -191,25 +143,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .setJars(allJars)
       .set("spark.files", allFiles.mkString(","))
       .set(CONTAINER_IMAGE, "spark-driver:latest")
-    val kubernetesConf = KubernetesConf(
-      sparkConf,
-      emptyDriverSpecificConf,
-      RESOURCE_NAME_PREFIX,
-      APP_ID,
-      DRIVER_LABELS,
-      DRIVER_ANNOTATIONS,
-      Map.empty,
-      Map.empty,
-      DRIVER_ENVS,
-      Nil,
-      hadoopConfSpec = None)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
 
     val step = new BasicDriverFeatureStep(kubernetesConf)
     val additionalProperties = step.getAdditionalPodSystemProperties()
     val expectedSparkConf = Map(
       KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
-      "spark.app.id" -> APP_ID,
-      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX,
+      "spark.app.id" -> KubernetesTestConf.APP_ID,
+      KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> 
kubernetesConf.resourceNamePrefix,
       "spark.kubernetes.submitInDriver" -> "true",
       "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
       "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt";,
@@ -234,19 +175,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
         .set(CONTAINER_IMAGE, "spark-driver:latest")
         .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m")
       factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) }
-      val driverConf = emptyDriverSpecificConf.copy(mainAppResource = resource)
-      val conf = KubernetesConf(
-        sparkConf,
-        driverConf,
-        RESOURCE_NAME_PREFIX,
-        APP_ID,
-        DRIVER_LABELS,
-        DRIVER_ANNOTATIONS,
-        Map.empty,
-        Map.empty,
-        DRIVER_ENVS,
-        Nil,
-        hadoopConfSpec = None)
+      val conf = KubernetesTestConf.createDriverConf(
+        sparkConf = sparkConf,
+        mainAppResource = resource)
       val step = new BasicDriverFeatureStep(conf)
       val pod = step.configurePod(SparkPod.initialPod())
       val mem = 
pod.container.getResources.getRequests.get("memory").getAmount()

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index e9a16aa..d6003c9 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -19,20 +19,18 @@ package org.apache.spark.deploy.k8s.features
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model._
-import org.mockito.MockitoAnnotations
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
+import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, 
KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.config._
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
-class BasicExecutorFeatureStepSuite
-  extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
+class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
 
-  private val APP_ID = "app-id"
   private val DRIVER_HOSTNAME = "localhost"
   private val DRIVER_PORT = 7098
   private val DRIVER_ADDRESS = RpcEndpointAddress(
@@ -45,7 +43,6 @@ class BasicExecutorFeatureStepSuite
   private val RESOURCE_NAME_PREFIX = "base"
   private val EXECUTOR_IMAGE = "executor-image"
   private val LABELS = Map("label1key" -> "label1value")
-  private val ANNOTATIONS = Map("annotation1key" -> "annotation1value")
   private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2")
   private val TEST_IMAGE_PULL_SECRET_OBJECTS =
     TEST_IMAGE_PULL_SECRETS.map { secret =>
@@ -66,37 +63,35 @@ class BasicExecutorFeatureStepSuite
   private var baseConf: SparkConf = _
 
   before {
-    MockitoAnnotations.initMocks(this)
     baseConf = new SparkConf()
       .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
       .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
       .set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
       .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
-      .set("spark.driver.host", DRIVER_HOSTNAME)
+      .set(DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME)
       .set("spark.driver.port", DRIVER_PORT.toString)
-      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
+      .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
       .set("spark.kubernetes.resource.type", "java")
   }
 
+  private def newExecutorConf(
+      environment: Map[String, String] = Map.empty): KubernetesExecutorConf = {
+    KubernetesTestConf.createExecutorConf(
+      sparkConf = baseConf,
+      driverPod = Some(DRIVER_POD),
+      labels = LABELS,
+      environment = environment)
+  }
+
   test("basic executor pod has reasonable defaults") {
-    val step = new BasicExecutorFeatureStep(
-      KubernetesConf(
-        baseConf,
-        KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
-        RESOURCE_NAME_PREFIX,
-        APP_ID,
-        LABELS,
-        ANNOTATIONS,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Nil,
-        hadoopConfSpec = None))
+    val step = new BasicExecutorFeatureStep(newExecutorConf())
     val executor = step.configurePod(SparkPod.initialPod())
 
     // The executor pod name and default labels.
     assert(executor.pod.getMetadata.getName === 
s"$RESOURCE_NAME_PREFIX-exec-1")
-    assert(executor.pod.getMetadata.getLabels.asScala === LABELS)
+    LABELS.foreach { case (k, v) =>
+      assert(executor.pod.getMetadata.getLabels.get(k) === v)
+    }
     assert(executor.pod.getSpec.getImagePullSecrets.asScala === 
TEST_IMAGE_PULL_SECRET_OBJECTS)
 
     // There is exactly 1 container with no volume mounts and default memory 
limits.
@@ -116,43 +111,18 @@ class BasicExecutorFeatureStepSuite
   }
 
   test("executor pod hostnames get truncated to 63 characters") {
-    val conf = baseConf.clone()
     val longPodNamePrefix = 
"loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple"
 
-    val step = new BasicExecutorFeatureStep(
-      KubernetesConf(
-        conf,
-        KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
-        longPodNamePrefix,
-        APP_ID,
-        LABELS,
-        ANNOTATIONS,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Nil,
-        hadoopConfSpec = None))
+    baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix)
+    val step = new BasicExecutorFeatureStep(newExecutorConf())
     
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length 
=== 63)
   }
 
   test("classpath and extra java options get translated into environment 
variables") {
-    val conf = baseConf.clone()
-    conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
-    conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
-
-    val step = new BasicExecutorFeatureStep(
-      KubernetesConf(
-        conf,
-        KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
-        RESOURCE_NAME_PREFIX,
-        APP_ID,
-        LABELS,
-        ANNOTATIONS,
-        Map.empty,
-        Map.empty,
-        Map("qux" -> "quux"),
-        Nil,
-        hadoopConfSpec = None))
+    baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar")
+    baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz")
+    val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
+    val step = new BasicExecutorFeatureStep(kconf)
     val executor = step.configurePod(SparkPod.initialPod())
 
     checkEnv(executor,
@@ -163,23 +133,10 @@ class BasicExecutorFeatureStepSuite
   }
 
   test("test executor pyspark memory") {
-    val conf = baseConf.clone()
-    conf.set("spark.kubernetes.resource.type", "python")
-    conf.set(org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY, 42L)
-
-    val step = new BasicExecutorFeatureStep(
-      KubernetesConf(
-        conf,
-        KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
-        RESOURCE_NAME_PREFIX,
-        APP_ID,
-        LABELS,
-        ANNOTATIONS,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Nil,
-        hadoopConfSpec = None))
+    baseConf.set("spark.kubernetes.resource.type", "python")
+    baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L)
+
+    val step = new BasicExecutorFeatureStep(newExecutorConf())
     val executor = step.configurePod(SparkPod.initialPod())
     // This is checking that basic executor + executorMemory = 1408 + 42 = 1450
     assert(executor.container.getResources.getRequests.get("memory").getAmount 
=== "1450Mi")
@@ -199,7 +156,7 @@ class BasicExecutorFeatureStepSuite
       ENV_DRIVER_URL -> DRIVER_ADDRESS.toString,
       ENV_EXECUTOR_CORES -> "1",
       ENV_EXECUTOR_MEMORY -> "1g",
-      ENV_APPLICATION_ID -> APP_ID,
+      ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
       ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
       ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
index 3067295..f74ac92 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala
@@ -27,8 +27,6 @@ import org.apache.spark.util.Utils
 
 class DriverCommandFeatureStepSuite extends SparkFunSuite {
 
-  private val MAIN_CLASS = "mainClass"
-
   test("java resource") {
     val mainResource = "local:///main.jar"
     val spec = applyFeatureStep(
@@ -37,7 +35,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
     assert(spec.pod.container.getArgs.asScala === List(
       "driver",
       "--properties-file", SPARK_CONF_PATH,
-      "--class", MAIN_CLASS,
+      "--class", KubernetesTestConf.MAIN_CLASS,
       "spark-internal", "5", "7"))
 
     val jars = Utils.stringToSeq(spec.systemProperties("spark.jars"))
@@ -55,7 +53,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
     assert(spec.pod.container.getArgs.asScala === List(
       "driver",
       "--properties-file", SPARK_CONF_PATH,
-      "--class", MAIN_CLASS,
+      "--class", KubernetesTestConf.MAIN_CLASS,
       "/main.py"))
     val envs = spec.pod.container.getEnv.asScala
       .map { env => (env.getName, env.getValue) }
@@ -86,7 +84,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
     assert(spec.pod.container.getArgs.asScala === List(
       "driver",
       "--properties-file", SPARK_CONF_PATH,
-      "--class", MAIN_CLASS,
+      "--class", KubernetesTestConf.MAIN_CLASS,
       "/main.py", "5", "7", "9"))
 
     val envs = spec.pod.container.getEnv.asScala
@@ -112,7 +110,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
     assert(spec.pod.container.getArgs.asScala === List(
       "driver",
       "--properties-file", SPARK_CONF_PATH,
-      "--class", MAIN_CLASS,
+      "--class", KubernetesTestConf.MAIN_CLASS,
       "/main.R", "5", "7", "9"))
   }
 
@@ -121,20 +119,11 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite 
{
       conf: SparkConf = new SparkConf(false),
       appArgs: Array[String] = Array(),
       pyFiles: Seq[String] = Nil): KubernetesDriverSpec = {
-    val driverConf = new KubernetesDriverSpecificConf(
-      resource, MAIN_CLASS, "appName", appArgs, pyFiles = pyFiles)
-    val kubernetesConf = KubernetesConf(
-      conf,
-      driverConf,
-      "resource-prefix",
-      "appId",
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(
+      sparkConf = conf,
+      mainAppResource = resource,
+      appArgs = appArgs,
+      pyFiles = pyFiles)
     val step = new DriverCommandFeatureStep(kubernetesConf)
     val pod = step.configurePod(SparkPod.initialPod())
     val props = step.getAdditionalPodSystemProperties()

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
index 36c6616..7d8e929 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala
@@ -18,51 +18,25 @@ package org.apache.spark.deploy.k8s.features
 
 import java.io.File
 
+import scala.collection.JavaConverters._
+
 import com.google.common.base.Charsets
 import com.google.common.io.{BaseEncoding, Files}
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, Secret}
-import org.mockito.{Mock, MockitoAnnotations}
-import org.scalatest.BeforeAndAfter
-import scala.collection.JavaConverters._
+import io.fabric8.kubernetes.api.model.Secret
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.util.Utils
 
-class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with 
BeforeAndAfter {
+class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite {
 
-  private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark"
-  private val APP_ID = "k8s-app"
-  private var credentialsTempDirectory: File = _
+  private val credentialsTempDirectory = Utils.createTempDir()
   private val BASE_DRIVER_POD = SparkPod.initialPod()
 
-  @Mock
-  private var driverSpecificConf: KubernetesDriverSpecificConf = _
-
-  before {
-    MockitoAnnotations.initMocks(this)
-    credentialsTempDirectory = Utils.createTempDir()
-  }
-
-  after {
-    credentialsTempDirectory.delete()
-  }
-
   test("Don't set any credentials") {
-    val kubernetesConf = KubernetesConf(
-      new SparkConf(false),
-      driverSpecificConf,
-      KUBERNETES_RESOURCE_NAME_PREFIX,
-      APP_ID,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
+    val kubernetesConf = KubernetesTestConf.createDriverConf()
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === 
BASE_DRIVER_POD)
     
assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty)
@@ -83,19 +57,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       .set(
         
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
         "/mnt/secrets/my-ca.pem")
-    val kubernetesConf = KubernetesConf(
-      submissionSparkConf,
-      driverSpecificConf,
-      KUBERNETES_RESOURCE_NAME_PREFIX,
-      APP_ID,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
-
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
submissionSparkConf)
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === 
BASE_DRIVER_POD)
     
assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty)
@@ -122,18 +84,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       .set(
         s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
         caCertFile.getAbsolutePath)
-    val kubernetesConf = KubernetesConf(
-      submissionSparkConf,
-      driverSpecificConf,
-      KUBERNETES_RESOURCE_NAME_PREFIX,
-      APP_ID,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
submissionSparkConf)
     val kubernetesCredentialsStep = new 
DriverKubernetesCredentialsFeatureStep(kubernetesConf)
     val resolvedProperties = 
kubernetesCredentialsStep.getAdditionalPodSystemProperties()
     val expectedSparkConf = Map(
@@ -153,7 +104,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends 
SparkFunSuite with Bef
       .head
       .asInstanceOf[Secret]
     assert(credentialsSecret.getMetadata.getName ===
-      s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials")
+      s"${kubernetesConf.resourceNamePrefix}-kubernetes-credentials")
     val decodedSecretData = credentialsSecret.getData.asScala.map { data =>
       (data._1, new String(BaseEncoding.base64().decode(data._2), 
Charsets.UTF_8))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 3c46667..0452789 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -16,24 +16,19 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.Service
-import org.mockito.{Mock, MockitoAnnotations}
-import org.mockito.Mockito.when
-import org.scalatest.BeforeAndAfter
 import scala.collection.JavaConverters._
 
+import io.fabric8.kubernetes.api.model.Service
+
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
-import org.apache.spark.util.Clock
-
-class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
+import org.apache.spark.internal.config._
+import org.apache.spark.util.ManualClock
 
-  private val SHORT_RESOURCE_NAME_PREFIX =
-    "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH -
-      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX.length)
+class DriverServiceFeatureStepSuite extends SparkFunSuite {
 
   private val LONG_RESOURCE_NAME_PREFIX =
     "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH -
@@ -42,34 +37,14 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     "label1key" -> "label1value",
     "label2key" -> "label2value")
 
-  @Mock
-  private var clock: Clock = _
-
-  private var sparkConf: SparkConf = _
-
-  before {
-    MockitoAnnotations.initMocks(this)
-    sparkConf = new SparkConf(false)
-  }
-
   test("Headless service has a port for the driver RPC and the block 
manager.") {
-    sparkConf = sparkConf
+    val sparkConf = new SparkConf(false)
       .set("spark.driver.port", "9000")
-      .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
-    val configurationStep = new DriverServiceFeatureStep(
-      KubernetesConf(
-        sparkConf,
-        KubernetesDriverSpecificConf(
-          JavaMainAppResource(None), "main", "app", Seq.empty),
-        SHORT_RESOURCE_NAME_PREFIX,
-        "app-id",
-        DRIVER_LABELS,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Nil,
-        hadoopConfSpec = None))
+      .set(DRIVER_BLOCK_MANAGER_PORT, 8080)
+    val kconf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = DRIVER_LABELS)
+    val configurationStep = new DriverServiceFeatureStep(kconf)
     assert(configurationStep.configurePod(SparkPod.initialPod()) === 
SparkPod.initialPod())
     assert(configurationStep.getAdditionalKubernetesResources().size === 1)
     
assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service])
@@ -80,50 +55,28 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     verifyService(
       9000,
       8080,
-      
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
+      
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
       driverService)
   }
 
   test("Hostname and ports are set according to the service name.") {
-    val configurationStep = new DriverServiceFeatureStep(
-      KubernetesConf(
-        sparkConf
-          .set("spark.driver.port", "9000")
-          .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 
8080)
-          .set(KUBERNETES_NAMESPACE, "my-namespace"),
-        KubernetesDriverSpecificConf(
-          JavaMainAppResource(None), "main", "app", Seq.empty),
-        SHORT_RESOURCE_NAME_PREFIX,
-        "app-id",
-        DRIVER_LABELS,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Nil,
-        hadoopConfSpec = None))
-    val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
-      DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
+    val sparkConf = new SparkConf(false)
+      .set("spark.driver.port", "9000")
+      .set(DRIVER_BLOCK_MANAGER_PORT, 8080)
+      .set(KUBERNETES_NAMESPACE, "my-namespace")
+    val kconf = KubernetesTestConf.createDriverConf(
+      sparkConf = sparkConf,
+      labels = DRIVER_LABELS)
+    val configurationStep = new DriverServiceFeatureStep(kconf)
+    val expectedServiceName = kconf.resourceNamePrefix + 
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
     val expectedHostName = s"$expectedServiceName.my-namespace.svc"
     val additionalProps = configurationStep.getAdditionalPodSystemProperties()
     verifySparkConfHostNames(additionalProps, expectedHostName)
   }
 
   test("Ports should resolve to defaults in SparkConf and in the service.") {
-    val configurationStep = new DriverServiceFeatureStep(
-      KubernetesConf(
-        sparkConf,
-        KubernetesDriverSpecificConf(
-          JavaMainAppResource(None), "main", "app", Seq.empty),
-        SHORT_RESOURCE_NAME_PREFIX,
-        "app-id",
-        DRIVER_LABELS,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Nil,
-        hadoopConfSpec = None))
+    val kconf = KubernetesTestConf.createDriverConf(labels = DRIVER_LABELS)
+    val configurationStep = new DriverServiceFeatureStep(kconf)
     val resolvedService = configurationStep
       .getAdditionalKubernetesResources()
       .head
@@ -131,30 +84,23 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     verifyService(
       DEFAULT_DRIVER_PORT,
       DEFAULT_BLOCKMANAGER_PORT,
-      
s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
+      
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
       resolvedService)
     val additionalProps = configurationStep.getAdditionalPodSystemProperties()
     assert(additionalProps("spark.driver.port") === 
DEFAULT_DRIVER_PORT.toString)
-    
assert(additionalProps(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key)
-      === DEFAULT_BLOCKMANAGER_PORT.toString)
+    assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === 
DEFAULT_BLOCKMANAGER_PORT.toString)
   }
 
   test("Long prefixes should switch to using a generated name.") {
-    when(clock.getTimeMillis()).thenReturn(10000)
+    val clock = new ManualClock()
+    clock.setTime(10000)
+    val sparkConf = new SparkConf(false)
+      .set(KUBERNETES_NAMESPACE, "my-namespace")
     val configurationStep = new DriverServiceFeatureStep(
-      KubernetesConf(
-        sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
-        KubernetesDriverSpecificConf(
-          JavaMainAppResource(None), "main", "app", Seq.empty),
-        LONG_RESOURCE_NAME_PREFIX,
-        "app-id",
-        DRIVER_LABELS,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Map.empty,
-        Nil,
-        hadoopConfSpec = None),
+      KubernetesTestConf.createDriverConf(
+        sparkConf = sparkConf,
+        resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
+        labels = DRIVER_LABELS),
       clock)
     val driverService = configurationStep
       .getAdditionalKubernetesResources()
@@ -168,56 +114,27 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
   }
 
   test("Disallow bind address and driver host to be set explicitly.") {
-    try {
-      new DriverServiceFeatureStep(
-        KubernetesConf(
-          sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, 
"host"),
-          KubernetesDriverSpecificConf(
-            JavaMainAppResource(None), "main", "app", Seq.empty),
-          LONG_RESOURCE_NAME_PREFIX,
-          "app-id",
-          DRIVER_LABELS,
-          Map.empty,
-          Map.empty,
-          Map.empty,
-          Map.empty,
-          Nil,
-          hadoopConfSpec = None),
-        clock)
-      fail("The driver bind address should not be allowed.")
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage ===
-          s"requirement failed: 
${DriverServiceFeatureStep.DRIVER_BIND_ADDRESS_KEY} is" +
-          " not supported in Kubernetes mode, as the driver's bind address is 
managed" +
-          " and set to the driver pod's IP address.")
+    val sparkConf = new SparkConf(false)
+      .set(DRIVER_BIND_ADDRESS, "host")
+      .set("spark.app.name", LONG_RESOURCE_NAME_PREFIX)
+    val e1 = intercept[IllegalArgumentException] {
+      new 
DriverServiceFeatureStep(KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf))
     }
-    sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
-    sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
-    try {
-      new DriverServiceFeatureStep(
-        KubernetesConf(
-          sparkConf,
-          KubernetesDriverSpecificConf(
-            JavaMainAppResource(None), "main", "app", Seq.empty),
-          LONG_RESOURCE_NAME_PREFIX,
-          "app-id",
-          DRIVER_LABELS,
-          Map.empty,
-          Map.empty,
-          Map.empty,
-          Map.empty,
-          Nil,
-          hadoopConfSpec = None),
-        clock)
-      fail("The driver host address should not be allowed.")
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage ===
-          s"requirement failed: ${DriverServiceFeatureStep.DRIVER_HOST_KEY} 
is" +
-          " not supported in Kubernetes mode, as the driver's hostname will be 
managed via" +
-          " a Kubernetes service.")
+    assert(e1.getMessage ===
+      s"requirement failed: 
${DriverServiceFeatureStep.DRIVER_BIND_ADDRESS_KEY} is" +
+      " not supported in Kubernetes mode, as the driver's bind address is 
managed" +
+      " and set to the driver pod's IP address.")
+
+    sparkConf.remove(DRIVER_BIND_ADDRESS)
+    sparkConf.set(DRIVER_HOST_ADDRESS, "host")
+
+    val e2 = intercept[IllegalArgumentException] {
+      new 
DriverServiceFeatureStep(KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf))
     }
+    assert(e2.getMessage ===
+      s"requirement failed: ${DriverServiceFeatureStep.DRIVER_HOST_KEY} is" +
+      " not supported in Kubernetes mode, as the driver's hostname will be 
managed via" +
+      " a Kubernetes service.")
   }
 
   private def verifyService(
@@ -227,7 +144,9 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       service: Service): Unit = {
     assert(service.getMetadata.getName === expectedServiceName)
     assert(service.getSpec.getClusterIP === "None")
-    assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
+    DRIVER_LABELS.foreach { case (k, v) =>
+      assert(service.getSpec.getSelector.get(k) === v)
+    }
     assert(service.getSpec.getPorts.size() === 2)
     val driverServicePorts = service.getSpec.getPorts.asScala
     assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
index 3d25307..0455526 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala
@@ -16,12 +16,12 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.PodBuilder
+import scala.collection.JavaConverters._
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.SparkFunSuite
 import org.apache.spark.deploy.k8s._
 
-class EnvSecretsFeatureStepSuite extends SparkFunSuite{
+class EnvSecretsFeatureStepSuite extends SparkFunSuite {
   private val KEY_REF_NAME_FOO = "foo"
   private val KEY_REF_NAME_BAR = "bar"
   private val KEY_REF_KEY_FOO = "key_foo"
@@ -34,28 +34,14 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{
     val envVarsToKeys = Map(
       ENV_NAME_BAR -> s"${KEY_REF_NAME_BAR}:${KEY_REF_KEY_BAR}",
       ENV_NAME_FOO -> s"${KEY_REF_NAME_FOO}:${KEY_REF_KEY_FOO}")
-    val sparkConf = new SparkConf(false)
-    val kubernetesConf = KubernetesConf(
-      sparkConf,
-      KubernetesExecutorSpecificConf("1", Some(new PodBuilder().build())),
-      "resource-name-prefix",
-      "app-id",
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      envVarsToKeys,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(
+      secretEnvNamesToKeyRefs = envVarsToKeys)
 
     val step = new EnvSecretsFeatureStep(kubernetesConf)
-    val driverContainerWithEnvSecrets = 
step.configurePod(baseDriverPod).container
-
-    val expectedVars =
-      Seq(s"${ENV_NAME_BAR}", s"${ENV_NAME_FOO}")
-
-    expectedVars.foreach { envName =>
-      
assert(KubernetesFeaturesTestUtils.containerHasEnvVar(driverContainerWithEnvSecrets,
 envName))
+    val container = step.configurePod(baseDriverPod).container
+    val containerEnvKeys = container.getEnv.asScala.map { v => v.getName 
}.toSet
+    envVarsToKeys.keys.foreach { envName =>
+      assert(containerEnvKeys.contains(envName))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
index 894d824..8f34ce5 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala
@@ -17,45 +17,19 @@
 package org.apache.spark.deploy.k8s.features
 
 import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, 
VolumeMountBuilder}
-import org.mockito.Mockito
-import org.scalatest._
-import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
+import org.apache.spark.util.SparkConfWithEnv
 
-class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
+class LocalDirsFeatureStepSuite extends SparkFunSuite {
   private val defaultLocalDir = "/var/data/default-local-dir"
-  private var sparkConf: SparkConf = _
-  private var kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf] 
= _
-
-  before {
-    val realSparkConf = new SparkConf(false)
-    sparkConf = Mockito.spy(realSparkConf)
-    kubernetesConf = KubernetesConf(
-      sparkConf,
-      KubernetesDriverSpecificConf(
-        JavaMainAppResource(None),
-        "app-name",
-        "main",
-        Seq.empty),
-      "resource",
-      "app-id",
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
-  }
 
   test("Resolve to default local dir if neither env nor configuration are 
set") {
-    Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
-    Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
-    val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, 
defaultLocalDir)
+    val stepUnderTest = new 
LocalDirsFeatureStep(KubernetesTestConf.createDriverConf(),
+      defaultLocalDir)
     val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
     assert(configuredPod.pod.getSpec.getVolumes.size === 1)
     assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
@@ -79,8 +53,9 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with 
BeforeAndAfter {
   }
 
   test("Use configured local dirs split on comma if provided.") {
-    Mockito.doReturn("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
-      .when(sparkConf).getenv("SPARK_LOCAL_DIRS")
+    val sparkConf = new SparkConfWithEnv(Map(
+      "SPARK_LOCAL_DIRS" -> 
"/var/data/my-local-dir-1,/var/data/my-local-dir-2"))
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
     val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, 
defaultLocalDir)
     val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
     assert(configuredPod.pod.getSpec.getVolumes.size === 2)
@@ -116,9 +91,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with 
BeforeAndAfter {
   }
 
   test("Use tmpfs to back default local dir") {
-    Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
-    Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
-    Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS)
+    val sparkConf = new SparkConf(false).set(KUBERNETES_LOCAL_DIRS_TMPFS, true)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = 
sparkConf)
     val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, 
defaultLocalDir)
     val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
     assert(configuredPod.pod.getSpec.getVolumes.size === 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
index 1555f6a..22f6d26 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala
@@ -16,10 +16,8 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import io.fabric8.kubernetes.api.model.PodBuilder
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesExecutorSpecificConf, SecretVolumeUtils, SparkPod}
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.{KubernetesTestConf, SecretVolumeUtils, 
SparkPod}
 
 class MountSecretsFeatureStepSuite extends SparkFunSuite {
 
@@ -32,19 +30,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite {
     val secretNamesToMountPaths = Map(
       SECRET_FOO -> SECRET_MOUNT_PATH,
       SECRET_BAR -> SECRET_MOUNT_PATH)
-    val sparkConf = new SparkConf(false)
-    val kubernetesConf = KubernetesConf(
-      sparkConf,
-      KubernetesExecutorSpecificConf("1", Some(new PodBuilder().build())),
-      "resource-name-prefix",
-      "app-id",
-      Map.empty,
-      Map.empty,
-      secretNamesToMountPaths,
-      Map.empty,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
+    val kubernetesConf = KubernetesTestConf.createExecutorConf(
+      secretNamesToMountPaths = secretNamesToMountPaths)
 
     val step = new MountSecretsFeatureStep(kubernetesConf)
     val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
index aadbf16..e6f1dd6 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala
@@ -16,29 +16,12 @@
  */
 package org.apache.spark.deploy.k8s.features
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
-import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
 
 class MountVolumesFeatureStepSuite extends SparkFunSuite {
-  private val sparkConf = new SparkConf(false)
-  private val emptyKubernetesConf = KubernetesConf(
-    sparkConf = sparkConf,
-    roleSpecificConf = KubernetesDriverSpecificConf(
-      JavaMainAppResource(None),
-      "app-name",
-      "main",
-      Seq.empty),
-    appResourceNamePrefix = "resource",
-    appId = "app-id",
-    roleLabels = Map.empty,
-    roleAnnotations = Map.empty,
-    roleSecretNamesToMountPaths = Map.empty,
-    roleSecretEnvNamesToKeyRefs = Map.empty,
-    roleEnvs = Map.empty,
-    roleVolumes = Nil,
-    hadoopConfSpec = None)
-
   test("Mounts hostPath volumes") {
     val volumeConf = KubernetesVolumeSpec(
       "testVolume",
@@ -47,7 +30,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       false,
       KubernetesHostPathVolumeConf("/hostPath/tmp")
     )
-    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
@@ -67,7 +50,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
-    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
@@ -89,7 +72,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       false,
       KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
     )
-    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
@@ -111,7 +94,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       false,
       KubernetesEmptyDirVolumeConf(None, None)
     )
-    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
@@ -140,8 +123,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
-    val volumesConf = hpVolumeConf :: pvcVolumeConf :: Nil
-    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumesConf)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(
+      volumes = Seq(hpVolumeConf, pvcVolumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
@@ -157,7 +140,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       false,
       KubernetesEmptyDirVolumeConf(None, None)
     )
-    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
@@ -176,7 +159,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       true,
       KubernetesPVCVolumeConf("pvcClaim")
     )
-    val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: 
Nil)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(volumeConf))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
@@ -206,19 +189,18 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
       true,
       KubernetesEmptyDirVolumeConf(None, None)
     )
-    val kubernetesConf = emptyKubernetesConf.copy(
-      roleVolumes = emptyDirSpec :: pvcSpec :: Nil)
+    val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = 
Seq(emptyDirSpec, pvcSpec))
     val step = new MountVolumesFeatureStep(kubernetesConf)
     val configuredPod = step.configurePod(SparkPod.initialPod())
 
     assert(configuredPod.pod.getSpec.getVolumes.size() === 2)
-    val mounts = configuredPod.container.getVolumeMounts
-    assert(mounts.size() === 2)
-    assert(mounts.get(0).getName === "testEmptyDir")
-    assert(mounts.get(0).getMountPath === "/tmp/foo")
-    assert(mounts.get(0).getSubPath === "foo")
-    assert(mounts.get(1).getName === "testPVC")
-    assert(mounts.get(1).getMountPath === "/tmp/bar")
-    assert(mounts.get(1).getSubPath === "bar")
+    val mounts = 
configuredPod.container.getVolumeMounts.asScala.sortBy(_.getName())
+    assert(mounts.size === 2)
+    assert(mounts(0).getName === "testEmptyDir")
+    assert(mounts(0).getMountPath === "/tmp/foo")
+    assert(mounts(0).getSubPath === "foo")
+    assert(mounts(1).getName === "testPVC")
+    assert(mounts(1).getMountPath === "/tmp/bar")
+    assert(mounts(1).getSubPath === "bar")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
index 370948c..7295b82 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
@@ -20,40 +20,22 @@ import java.io.{File, PrintWriter}
 import java.nio.file.Files
 
 import io.fabric8.kubernetes.api.model.ConfigMap
-import org.mockito.Mockito
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s._
-import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
 
 class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
-  private var sparkConf: SparkConf = _
-  private var kubernetesConf : KubernetesConf[_ <: KubernetesRoleSpecificConf] 
= _
+  private var kubernetesConf : KubernetesConf = _
   private var templateFile: File = _
 
   before {
-    sparkConf = Mockito.mock(classOf[SparkConf])
-    kubernetesConf = KubernetesConf(
-      sparkConf,
-      KubernetesDriverSpecificConf(
-        JavaMainAppResource(None),
-        "app-name",
-        "main",
-        Seq.empty),
-      "resource",
-      "app-id",
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Nil,
-      Option.empty)
     templateFile = Files.createTempFile("pod-template", "yml").toFile
     templateFile.deleteOnExit()
-    Mockito.doReturn(Option(templateFile.getAbsolutePath)).when(sparkConf)
-      .get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+
+    val sparkConf = new SparkConf(false)
+      .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, 
templateFile.getAbsolutePath)
+    kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
   }
 
   test("Mounts executor template volume if config specified") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 08f2875..e9c05fe 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -24,8 +24,8 @@ import org.mockito.Mockito.{doReturn, verify, when}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.mockito.MockitoSugar._
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, 
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 
@@ -37,10 +37,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private val KUBERNETES_RESOURCE_PREFIX = "resource-example"
   private val POD_NAME = "driver"
   private val CONTAINER_NAME = "container"
-  private val APP_ID = "app-id"
-  private val APP_NAME = "app"
-  private val MAIN_CLASS = "main"
-  private val APP_ARGS = Seq("arg1", "arg2")
   private val RESOLVED_JAVA_OPTIONS = Map(
     "conf1key" -> "conf1value",
     "conf2key" -> "conf2value")
@@ -122,28 +118,15 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
   @Mock
   private var resourceList: RESOURCE_LIST = _
 
-  private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _
-
-  private var sparkConf: SparkConf = _
+  private var kconf: KubernetesDriverConf = _
   private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _
   private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _
 
   before {
     MockitoAnnotations.initMocks(this)
-    sparkConf = new SparkConf(false)
-    kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf](
-      sparkConf,
-      KubernetesDriverSpecificConf(JavaMainAppResource(None), MAIN_CLASS, 
APP_NAME, APP_ARGS),
-      KUBERNETES_RESOURCE_PREFIX,
-      APP_ID,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Map.empty,
-      Nil,
-      hadoopConfSpec = None)
-    
when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC)
+    kconf = KubernetesTestConf.createDriverConf(
+      resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
+    
when(driverBuilder.buildFromFeatures(kconf)).thenReturn(BUILT_KUBERNETES_SPEC)
     when(kubernetesClient.pods()).thenReturn(podOperations)
     when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
 
@@ -158,26 +141,22 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   test("The client should configure the pod using the builder.") {
     val submissionClient = new Client(
+      kconf,
       driverBuilder,
-      kubernetesConf,
       kubernetesClient,
       false,
-      "spark",
-      loggingPodStatusWatcher,
-      KUBERNETES_RESOURCE_PREFIX)
+      loggingPodStatusWatcher)
     submissionClient.run()
     verify(podOperations).create(FULL_EXPECTED_POD)
   }
 
   test("The client should create Kubernetes resources") {
     val submissionClient = new Client(
+      kconf,
       driverBuilder,
-      kubernetesConf,
       kubernetesClient,
       false,
-      "spark",
-      loggingPodStatusWatcher,
-      KUBERNETES_RESOURCE_PREFIX)
+      loggingPodStatusWatcher)
     submissionClient.run()
     val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues
     assert(otherCreatedResources.size === 2)
@@ -197,13 +176,11 @@ class ClientSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   test("Waiting for app completion should stall on the watcher") {
     val submissionClient = new Client(
+      kconf,
       driverBuilder,
-      kubernetesConf,
       kubernetesClient,
       true,
-      "spark",
-      loggingPodStatusWatcher,
-      KUBERNETES_RESOURCE_PREFIX)
+      loggingPodStatusWatcher)
     submissionClient.run()
     verify(loggingPodStatusWatcher).awaitCompletion()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to