http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 167fb40..a5ad972 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -21,57 +21,46 @@ import java.io.File import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ private[spark] class KubernetesDriverBuilder( - provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = + provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) = new BasicDriverFeatureStep(_), - provideCredentialsStep: (KubernetesConf[KubernetesDriverSpecificConf]) - => DriverKubernetesCredentialsFeatureStep = + provideCredentialsStep: (KubernetesDriverConf => DriverKubernetesCredentialsFeatureStep) = new DriverKubernetesCredentialsFeatureStep(_), - provideServiceStep: (KubernetesConf[KubernetesDriverSpecificConf]) => DriverServiceFeatureStep = + provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) = new DriverServiceFeatureStep(_), - provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => MountSecretsFeatureStep) = + provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = new MountSecretsFeatureStep(_), - provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => EnvSecretsFeatureStep) = + provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = new EnvSecretsFeatureStep(_), - provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) - => LocalDirsFeatureStep = + provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) = new LocalDirsFeatureStep(_), - provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => MountVolumesFeatureStep) = + provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), - provideDriverCommandStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => DriverCommandFeatureStep) = + provideDriverCommandStep: (KubernetesDriverConf => DriverCommandFeatureStep) = new DriverCommandFeatureStep(_), - provideHadoopGlobalStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => KerberosConfDriverFeatureStep) = - new KerberosConfDriverFeatureStep(_), - providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => PodTemplateConfigMapStep) = - new PodTemplateConfigMapStep(_), - provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { + provideHadoopGlobalStep: (KubernetesDriverConf => KerberosConfDriverFeatureStep) = + new KerberosConfDriverFeatureStep(_), + providePodTemplateConfigMapStep: (KubernetesConf => PodTemplateConfigMapStep) = + new PodTemplateConfigMapStep(_), + provideInitialPod: () => SparkPod = () => SparkPod.initialPod) { - def buildFromFeatures( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { + def buildFromFeatures(kubernetesConf: KubernetesDriverConf): KubernetesDriverSpec = { val baseFeatures = Seq( provideBasicStep(kubernetesConf), provideCredentialsStep(kubernetesConf), provideServiceStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) - val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) { Seq(provideSecretsStep(kubernetesConf)) } else Nil - val envSecretFeature = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) { + val envSecretFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) { Seq(provideEnvSecretsStep(kubernetesConf)) } else Nil - val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { + val volumesFeature = if (kubernetesConf.volumes.nonEmpty) { Seq(provideVolumesStep(kubernetesConf)) } else Nil val podTemplateFeature = if ( @@ -81,14 +70,12 @@ private[spark] class KubernetesDriverBuilder( val driverCommandStep = provideDriverCommandStep(kubernetesConf) - val maybeHadoopConfigStep = - kubernetesConf.hadoopConfSpec.map { _ => - provideHadoopGlobalStep(kubernetesConf)} + val hadoopConfigStep = Some(provideHadoopGlobalStep(kubernetesConf)) val allFeatures: Seq[KubernetesFeatureConfigStep] = baseFeatures ++ Seq(driverCommandStep) ++ secretFeature ++ envSecretFeature ++ volumesFeature ++ - maybeHadoopConfigStep.toSeq ++ podTemplateFeature + hadoopConfigStep ++ podTemplateFeature var spec = KubernetesDriverSpec( provideInitialPod(),
http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index fc41a47..d24ff0d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -26,50 +26,38 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ private[spark] class KubernetesExecutorBuilder( - provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf]) - => BasicExecutorFeatureStep = + provideBasicStep: (KubernetesExecutorConf => BasicExecutorFeatureStep) = new BasicExecutorFeatureStep(_), - provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) - => MountSecretsFeatureStep = + provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) = new MountSecretsFeatureStep(_), - provideEnvSecretsStep: - (KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) = + provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) = new EnvSecretsFeatureStep(_), - provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) - => LocalDirsFeatureStep = + provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) = new LocalDirsFeatureStep(_), - provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] - => MountVolumesFeatureStep) = + provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), - provideHadoopConfStep: ( - KubernetesConf[KubernetesExecutorSpecificConf] - => HadoopConfExecutorFeatureStep) = + provideHadoopConfStep: (KubernetesExecutorConf => HadoopConfExecutorFeatureStep) = new HadoopConfExecutorFeatureStep(_), - provideKerberosConfStep: ( - KubernetesConf[KubernetesExecutorSpecificConf] - => KerberosConfExecutorFeatureStep) = + provideKerberosConfStep: (KubernetesExecutorConf => KerberosConfExecutorFeatureStep) = new KerberosConfExecutorFeatureStep(_), - provideHadoopSparkUserStep: ( - KubernetesConf[KubernetesExecutorSpecificConf] - => HadoopSparkUserExecutorFeatureStep) = + provideHadoopSparkUserStep: (KubernetesExecutorConf => HadoopSparkUserExecutorFeatureStep) = new HadoopSparkUserExecutorFeatureStep(_), provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { - def buildFromFeatures( - kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { + def buildFromFeatures(kubernetesConf: KubernetesExecutorConf): SparkPod = { val sparkConf = kubernetesConf.sparkConf val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME) val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME) val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY) val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) - val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) { Seq(provideSecretsStep(kubernetesConf)) } else Nil - val secretEnvFeature = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) { + val secretEnvFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) { Seq(provideEnvSecretsStep(kubernetesConf)) } else Nil - val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { + val volumesFeature = if (kubernetesConf.volumes.nonEmpty) { Seq(provideVolumesStep(kubernetesConf)) } else Nil http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 41ca8d1..f4d40b0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -26,10 +26,6 @@ import org.apache.spark.deploy.k8s.submit._ class KubernetesConfSuite extends SparkFunSuite { - private val APP_NAME = "test-app" - private val RESOURCE_NAME_PREFIX = "prefix" - private val APP_ID = "test-id" - private val MAIN_CLASS = "test-class" private val APP_ARGS = Array("arg1", "arg2") private val CUSTOM_LABELS = Map( "customLabel1Key" -> "customLabel1Value", @@ -49,26 +45,6 @@ class KubernetesConfSuite extends SparkFunSuite { private val DRIVER_POD = new PodBuilder().build() private val EXECUTOR_ID = "executor-id" - test("Basic driver translated fields.") { - val sparkConf = new SparkConf(false) - val conf = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource = JavaMainAppResource(None), - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(conf.appId === APP_ID) - assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap) - assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX) - assert(conf.roleSpecificConf.appName === APP_NAME) - assert(conf.roleSpecificConf.mainClass === MAIN_CLASS) - assert(conf.roleSpecificConf.appArgs === APP_ARGS) - } - test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") { val sparkConf = new SparkConf(false) .set(MEMORY_OVERHEAD_FACTOR, 0.3) @@ -90,22 +66,19 @@ class KubernetesConfSuite extends SparkFunSuite { val conf = KubernetesConf.createDriverConf( sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource = JavaMainAppResource(None), - MAIN_CLASS, + KubernetesTestConf.APP_ID, + JavaMainAppResource(None), + KubernetesTestConf.MAIN_CLASS, APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(conf.roleLabels === Map( - SPARK_APP_ID_LABEL -> APP_ID, + None) + assert(conf.labels === Map( + SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++ CUSTOM_LABELS) - assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS) - assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) - assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS) - assert(conf.roleEnvs === CUSTOM_ENVS) + assert(conf.annotations === CUSTOM_ANNOTATIONS) + assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) + assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) + assert(conf.environment === CUSTOM_ENVS) assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) } @@ -113,20 +86,20 @@ class KubernetesConfSuite extends SparkFunSuite { val conf = KubernetesConf.createExecutorConf( new SparkConf(false), EXECUTOR_ID, - APP_ID, + KubernetesTestConf.APP_ID, Some(DRIVER_POD)) - assert(conf.roleSpecificConf.executorId === EXECUTOR_ID) - assert(conf.roleSpecificConf.driverPod.get === DRIVER_POD) + assert(conf.executorId === EXECUTOR_ID) + assert(conf.driverPod.get === DRIVER_POD) } test("Image pull secrets.") { val conf = KubernetesConf.createExecutorConf( new SparkConf(false) - .set(IMAGE_PULL_SECRETS, "my-secret-1,my-secret-2 "), + .set(IMAGE_PULL_SECRETS, Seq("my-secret-1", "my-secret-2 ")), EXECUTOR_ID, - APP_ID, + KubernetesTestConf.APP_ID, Some(DRIVER_POD)) - assert(conf.imagePullSecrets() === + assert(conf.imagePullSecrets === Seq( new LocalObjectReferenceBuilder().withName("my-secret-1").build(), new LocalObjectReferenceBuilder().withName("my-secret-2").build())) @@ -150,14 +123,14 @@ class KubernetesConfSuite extends SparkFunSuite { val conf = KubernetesConf.createExecutorConf( sparkConf, EXECUTOR_ID, - APP_ID, + KubernetesTestConf.APP_ID, Some(DRIVER_POD)) - assert(conf.roleLabels === Map( + assert(conf.labels === Map( SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID, - SPARK_APP_ID_LABEL -> APP_ID, + SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID, SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS) - assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS) - assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) - assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS) + assert(conf.annotations === CUSTOM_ANNOTATIONS) + assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) + assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) } } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala new file mode 100644 index 0000000..1d77a6d --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} + +/** + * Builder methods for KubernetesConf that allow easy control over what to return for a few + * properties. For use with tests instead of having to mock specific properties. + */ +object KubernetesTestConf { + + val APP_ID = "appId" + val MAIN_CLASS = "mainClass" + val RESOURCE_PREFIX = "prefix" + val EXECUTOR_ID = "1" + + private val DEFAULT_CONF = new SparkConf(false) + + // scalastyle:off argcount + def createDriverConf( + sparkConf: SparkConf = DEFAULT_CONF, + appId: String = APP_ID, + mainAppResource: MainAppResource = JavaMainAppResource(None), + mainClass: String = MAIN_CLASS, + appArgs: Array[String] = Array.empty, + pyFiles: Seq[String] = Nil, + resourceNamePrefix: Option[String] = None, + labels: Map[String, String] = Map.empty, + environment: Map[String, String] = Map.empty, + annotations: Map[String, String] = Map.empty, + secretEnvNamesToKeyRefs: Map[String, String] = Map.empty, + secretNamesToMountPaths: Map[String, String] = Map.empty, + volumes: Seq[KubernetesVolumeSpec] = Seq.empty): KubernetesDriverConf = { + val conf = sparkConf.clone() + + resourceNamePrefix.foreach { prefix => + conf.set(KUBERNETES_DRIVER_POD_NAME_PREFIX, prefix) + } + setPrefixedConfigs(conf, KUBERNETES_DRIVER_LABEL_PREFIX, labels) + setPrefixedConfigs(conf, KUBERNETES_DRIVER_ENV_PREFIX, environment) + setPrefixedConfigs(conf, KUBERNETES_DRIVER_ANNOTATION_PREFIX, annotations) + setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRETS_PREFIX, secretNamesToMountPaths) + setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs) + setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes) + + new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, pyFiles) + } + // scalastyle:on argcount + + def createExecutorConf( + sparkConf: SparkConf = DEFAULT_CONF, + driverPod: Option[Pod] = None, + labels: Map[String, String] = Map.empty, + environment: Map[String, String] = Map.empty, + annotations: Map[String, String] = Map.empty, + secretEnvNamesToKeyRefs: Map[String, String] = Map.empty, + secretNamesToMountPaths: Map[String, String] = Map.empty, + volumes: Seq[KubernetesVolumeSpec] = Seq.empty): KubernetesExecutorConf = { + val conf = sparkConf.clone() + + setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_LABEL_PREFIX, labels) + setPrefixedConfigs(conf, "spark.executorEnv.", environment) + setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, annotations) + setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_SECRETS_PREFIX, secretNamesToMountPaths) + setPrefixedConfigs(conf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs) + setVolumeSpecs(conf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX, volumes) + + new KubernetesExecutorConf(conf, APP_ID, EXECUTOR_ID, driverPod) + } + + private def setPrefixedConfigs( + conf: SparkConf, + prefix: String, + values: Map[String, String]): Unit = { + values.foreach { case (k, v) => + conf.set(s"${prefix}$k", v) + } + } + + private def setVolumeSpecs( + conf: SparkConf, + prefix: String, + volumes: Seq[KubernetesVolumeSpec]): Unit = { + def key(vtype: String, vname: String, subkey: String): String = { + s"${prefix}$vtype.$vname.$subkey" + } + + volumes.foreach { case spec => + val (vtype, configs) = spec.volumeConf match { + case KubernetesHostPathVolumeConf(path) => + (KUBERNETES_VOLUMES_HOSTPATH_TYPE, + Map(KUBERNETES_VOLUMES_OPTIONS_PATH_KEY -> path)) + + case KubernetesPVCVolumeConf(claimName) => + (KUBERNETES_VOLUMES_PVC_TYPE, + Map(KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY -> claimName)) + + case KubernetesEmptyDirVolumeConf(medium, sizeLimit) => + val mconf = medium.map { m => (KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY, m) }.toMap + val lconf = sizeLimit.map { l => (KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY, l) }.toMap + (KUBERNETES_VOLUMES_EMPTYDIR_TYPE, mconf ++ lconf) + } + + conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_PATH_KEY), spec.mountPath) + if (spec.mountSubPath.nonEmpty) { + conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_SUBPATH_KEY), + spec.mountSubPath) + } + conf.set(key(vtype, spec.volumeName, KUBERNETES_VOLUMES_MOUNT_READONLY_KEY), + spec.mountReadOnly.toString) + configs.foreach { case (k, v) => + conf.set(key(vtype, spec.volumeName, k), v) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index de79a58..c079089 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -25,7 +25,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true") sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head assert(volumeSpec.volumeName === "volumeName") assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly === true) @@ -39,7 +39,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true") sparkConf.set("test.emptyDir.volumeName.mount.subPath", "subPath") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head assert(volumeSpec.volumeName === "volumeName") assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountSubPath === "subPath") @@ -51,7 +51,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimeName") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head assert(volumeSpec.volumeName === "volumeName") assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly === true) @@ -66,7 +66,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { sparkConf.set("test.emptyDir.volumeName.options.medium", "medium") sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5G") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head assert(volumeSpec.volumeName === "volumeName") assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly === true) @@ -79,7 +79,7 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { sparkConf.set("test.emptyDir.volumeName.mount.path", "/path") sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head assert(volumeSpec.volumeName === "volumeName") assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly === true) @@ -92,27 +92,29 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { sparkConf.set("test.hostPath.volumeName.mount.path", "/path") sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head assert(volumeSpec.mountReadOnly === false) } - test("Gracefully fails on missing mount key") { + test("Fails on missing mount key") { val sparkConf = new SparkConf(false) sparkConf.set("test.emptyDir.volumeName.mnt.path", "/path") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head - assert(volumeSpec.isFailure === true) - assert(volumeSpec.failed.get.getMessage === "emptyDir.volumeName.mount.path") + val e = intercept[NoSuchElementException] { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.") + } + assert(e.getMessage.contains("emptyDir.volumeName.mount.path")) } - test("Gracefully fails on missing option key") { + test("Fails on missing option key") { val sparkConf = new SparkConf(false) sparkConf.set("test.hostPath.volumeName.mount.path", "/path") sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true") sparkConf.set("test.hostPath.volumeName.options.pth", "/hostPath") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head - assert(volumeSpec.isFailure === true) - assert(volumeSpec.failed.get.getMessage === "hostPath.volumeName.options.path") + val e = intercept[NoSuchElementException] { + KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.") + } + assert(e.getMessage.contains("hostPath.volumeName.options.path")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 1e7dfbe..e4951bc 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ @@ -30,32 +30,17 @@ import org.apache.spark.ui.SparkUI class BasicDriverFeatureStepSuite extends SparkFunSuite { - private val APP_ID = "spark-app-id" - private val RESOURCE_NAME_PREFIX = "spark" private val DRIVER_LABELS = Map("labelkey" -> "labelvalue") private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" - private val APP_NAME = "spark-test" - private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" - private val PY_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" - private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"") - private val CUSTOM_ANNOTATION_KEY = "customAnnotation" - private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" - private val DRIVER_ANNOTATIONS = Map(CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE) - private val DRIVER_CUSTOM_ENV1 = "customDriverEnv1" - private val DRIVER_CUSTOM_ENV2 = "customDriverEnv2" + private val DRIVER_ANNOTATIONS = Map("customAnnotation" -> "customAnnotationValue") private val DRIVER_ENVS = Map( - DRIVER_CUSTOM_ENV1 -> DRIVER_CUSTOM_ENV1, - DRIVER_CUSTOM_ENV2 -> DRIVER_CUSTOM_ENV2) + "customDriverEnv1" -> "customDriverEnv2", + "customDriverEnv2" -> "customDriverEnv2") private val TEST_IMAGE_PULL_SECRETS = Seq("my-secret-1", "my-secret-2") private val TEST_IMAGE_PULL_SECRET_OBJECTS = TEST_IMAGE_PULL_SECRETS.map { secret => new LocalObjectReferenceBuilder().withName(secret).build() } - private val emptyDriverSpecificConf = KubernetesDriverSpecificConf( - JavaMainAppResource(None), - APP_NAME, - MAIN_CLASS, - APP_ARGS) test("Check the pod respects all configurations from the user.") { val sparkConf = new SparkConf() @@ -65,19 +50,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(DRIVER_MEMORY.key, "256M") .set(DRIVER_MEMORY_OVERHEAD, 200L) .set(CONTAINER_IMAGE, "spark-driver:latest") - .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) - val kubernetesConf = KubernetesConf( - sparkConf, - emptyDriverSpecificConf, - RESOURCE_NAME_PREFIX, - APP_ID, - DRIVER_LABELS, - DRIVER_ANNOTATIONS, - Map.empty, - Map.empty, - DRIVER_ENVS, - Nil, - hadoopConfSpec = None) + .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) + val kubernetesConf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + labels = DRIVER_LABELS, + environment = DRIVER_ENVS, + annotations = DRIVER_ANNOTATIONS) val featureStep = new BasicDriverFeatureStep(kubernetesConf) val basePod = SparkPod.initialPod() @@ -99,10 +77,11 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val envs = configuredPod.container .getEnv .asScala - .map(env => (env.getName, env.getValue)) + .map { env => (env.getName, env.getValue) } .toMap - assert(envs(DRIVER_CUSTOM_ENV1) === DRIVER_ENVS(DRIVER_CUSTOM_ENV1)) - assert(envs(DRIVER_CUSTOM_ENV2) === DRIVER_ENVS(DRIVER_CUSTOM_ENV2)) + DRIVER_ENVS.foreach { case (k, v) => + assert(envs(v) === v) + } assert(configuredPod.pod.getSpec().getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS) @@ -122,13 +101,15 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val driverPodMetadata = configuredPod.pod.getMetadata assert(driverPodMetadata.getName === "spark-driver-pod") - assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS) + DRIVER_LABELS.foreach { case (k, v) => + assert(driverPodMetadata.getLabels.get(k) === v) + } assert(driverPodMetadata.getAnnotations.asScala === DRIVER_ANNOTATIONS) assert(configuredPod.pod.getSpec.getRestartPolicy === "Never") val expectedSparkConf = Map( KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", - "spark.app.id" -> APP_ID, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, + "spark.app.id" -> KubernetesTestConf.APP_ID, + KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix, "spark.kubernetes.submitInDriver" -> "true", MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) @@ -141,39 +122,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val pythonSparkConf = new SparkConf() .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver-py:latest") - val javaKubernetesConf = KubernetesConf( - javaSparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - APP_NAME, - PY_MAIN_CLASS, - APP_ARGS), - RESOURCE_NAME_PREFIX, - APP_ID, - DRIVER_LABELS, - DRIVER_ANNOTATIONS, - Map.empty, - Map.empty, - DRIVER_ENVS, - Nil, - hadoopConfSpec = None) - - val pythonKubernetesConf = KubernetesConf( - pythonSparkConf, - KubernetesDriverSpecificConf( - PythonMainAppResource(""), - APP_NAME, - PY_MAIN_CLASS, - APP_ARGS), - RESOURCE_NAME_PREFIX, - APP_ID, - DRIVER_LABELS, - DRIVER_ANNOTATIONS, - Map.empty, - Map.empty, - DRIVER_ENVS, - Nil, - hadoopConfSpec = None) + val javaKubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = javaSparkConf) + val pythonKubernetesConf = KubernetesTestConf.createDriverConf( + sparkConf = pythonSparkConf, + mainAppResource = PythonMainAppResource("")) val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) val basePod = SparkPod.initialPod() @@ -191,25 +143,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .setJars(allJars) .set("spark.files", allFiles.mkString(",")) .set(CONTAINER_IMAGE, "spark-driver:latest") - val kubernetesConf = KubernetesConf( - sparkConf, - emptyDriverSpecificConf, - RESOURCE_NAME_PREFIX, - APP_ID, - DRIVER_LABELS, - DRIVER_ANNOTATIONS, - Map.empty, - Map.empty, - DRIVER_ENVS, - Nil, - hadoopConfSpec = None) + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) val step = new BasicDriverFeatureStep(kubernetesConf) val additionalProperties = step.getAdditionalPodSystemProperties() val expectedSparkConf = Map( KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", - "spark.app.id" -> APP_ID, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, + "spark.app.id" -> KubernetesTestConf.APP_ID, + KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix, "spark.kubernetes.submitInDriver" -> "true", "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", @@ -234,19 +175,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(CONTAINER_IMAGE, "spark-driver:latest") .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } - val driverConf = emptyDriverSpecificConf.copy(mainAppResource = resource) - val conf = KubernetesConf( - sparkConf, - driverConf, - RESOURCE_NAME_PREFIX, - APP_ID, - DRIVER_LABELS, - DRIVER_ANNOTATIONS, - Map.empty, - Map.empty, - DRIVER_ENVS, - Nil, - hadoopConfSpec = None) + val conf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + mainAppResource = resource) val step = new BasicDriverFeatureStep(conf) val pod = step.configurePod(SparkPod.initialPod()) val mem = pod.container.getResources.getRequests.get("memory").getAmount() http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index e9a16aa..d6003c9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -19,20 +19,18 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ -import org.mockito.MockitoAnnotations -import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} +import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -class BasicExecutorFeatureStepSuite - extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { +class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { - private val APP_ID = "app-id" private val DRIVER_HOSTNAME = "localhost" private val DRIVER_PORT = 7098 private val DRIVER_ADDRESS = RpcEndpointAddress( @@ -45,7 +43,6 @@ class BasicExecutorFeatureStepSuite private val RESOURCE_NAME_PREFIX = "base" private val EXECUTOR_IMAGE = "executor-image" private val LABELS = Map("label1key" -> "label1value") - private val ANNOTATIONS = Map("annotation1key" -> "annotation1value") private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2") private val TEST_IMAGE_PULL_SECRET_OBJECTS = TEST_IMAGE_PULL_SECRETS.map { secret => @@ -66,37 +63,35 @@ class BasicExecutorFeatureStepSuite private var baseConf: SparkConf = _ before { - MockitoAnnotations.initMocks(this) baseConf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX) .set(CONTAINER_IMAGE, EXECUTOR_IMAGE) .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true) - .set("spark.driver.host", DRIVER_HOSTNAME) + .set(DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME) .set("spark.driver.port", DRIVER_PORT.toString) - .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) + .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS) .set("spark.kubernetes.resource.type", "java") } + private def newExecutorConf( + environment: Map[String, String] = Map.empty): KubernetesExecutorConf = { + KubernetesTestConf.createExecutorConf( + sparkConf = baseConf, + driverPod = Some(DRIVER_POD), + labels = LABELS, + environment = environment) + } + test("basic executor pod has reasonable defaults") { - val step = new BasicExecutorFeatureStep( - KubernetesConf( - baseConf, - KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), - RESOURCE_NAME_PREFIX, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None)) + val step = new BasicExecutorFeatureStep(newExecutorConf()) val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. assert(executor.pod.getMetadata.getName === s"$RESOURCE_NAME_PREFIX-exec-1") - assert(executor.pod.getMetadata.getLabels.asScala === LABELS) + LABELS.foreach { case (k, v) => + assert(executor.pod.getMetadata.getLabels.get(k) === v) + } assert(executor.pod.getSpec.getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS) // There is exactly 1 container with no volume mounts and default memory limits. @@ -116,43 +111,18 @@ class BasicExecutorFeatureStepSuite } test("executor pod hostnames get truncated to 63 characters") { - val conf = baseConf.clone() val longPodNamePrefix = "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple" - val step = new BasicExecutorFeatureStep( - KubernetesConf( - conf, - KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), - longPodNamePrefix, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None)) + baseConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, longPodNamePrefix) + val step = new BasicExecutorFeatureStep(newExecutorConf()) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } test("classpath and extra java options get translated into environment variables") { - val conf = baseConf.clone() - conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar") - conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") - - val step = new BasicExecutorFeatureStep( - KubernetesConf( - conf, - KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), - RESOURCE_NAME_PREFIX, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty, - Map("qux" -> "quux"), - Nil, - hadoopConfSpec = None)) + baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar") + baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz") + val kconf = newExecutorConf(environment = Map("qux" -> "quux")) + val step = new BasicExecutorFeatureStep(kconf) val executor = step.configurePod(SparkPod.initialPod()) checkEnv(executor, @@ -163,23 +133,10 @@ class BasicExecutorFeatureStepSuite } test("test executor pyspark memory") { - val conf = baseConf.clone() - conf.set("spark.kubernetes.resource.type", "python") - conf.set(org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY, 42L) - - val step = new BasicExecutorFeatureStep( - KubernetesConf( - conf, - KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), - RESOURCE_NAME_PREFIX, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None)) + baseConf.set("spark.kubernetes.resource.type", "python") + baseConf.set(PYSPARK_EXECUTOR_MEMORY, 42L) + + val step = new BasicExecutorFeatureStep(newExecutorConf()) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi") @@ -199,7 +156,7 @@ class BasicExecutorFeatureStepSuite ENV_DRIVER_URL -> DRIVER_ADDRESS.toString, ENV_EXECUTOR_CORES -> "1", ENV_EXECUTOR_MEMORY -> "1g", - ENV_APPLICATION_ID -> APP_ID, + ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID, ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala index 3067295..f74ac92 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -27,8 +27,6 @@ import org.apache.spark.util.Utils class DriverCommandFeatureStepSuite extends SparkFunSuite { - private val MAIN_CLASS = "mainClass" - test("java resource") { val mainResource = "local:///main.jar" val spec = applyFeatureStep( @@ -37,7 +35,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { assert(spec.pod.container.getArgs.asScala === List( "driver", "--properties-file", SPARK_CONF_PATH, - "--class", MAIN_CLASS, + "--class", KubernetesTestConf.MAIN_CLASS, "spark-internal", "5", "7")) val jars = Utils.stringToSeq(spec.systemProperties("spark.jars")) @@ -55,7 +53,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { assert(spec.pod.container.getArgs.asScala === List( "driver", "--properties-file", SPARK_CONF_PATH, - "--class", MAIN_CLASS, + "--class", KubernetesTestConf.MAIN_CLASS, "/main.py")) val envs = spec.pod.container.getEnv.asScala .map { env => (env.getName, env.getValue) } @@ -86,7 +84,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { assert(spec.pod.container.getArgs.asScala === List( "driver", "--properties-file", SPARK_CONF_PATH, - "--class", MAIN_CLASS, + "--class", KubernetesTestConf.MAIN_CLASS, "/main.py", "5", "7", "9")) val envs = spec.pod.container.getEnv.asScala @@ -112,7 +110,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { assert(spec.pod.container.getArgs.asScala === List( "driver", "--properties-file", SPARK_CONF_PATH, - "--class", MAIN_CLASS, + "--class", KubernetesTestConf.MAIN_CLASS, "/main.R", "5", "7", "9")) } @@ -121,20 +119,11 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { conf: SparkConf = new SparkConf(false), appArgs: Array[String] = Array(), pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { - val driverConf = new KubernetesDriverSpecificConf( - resource, MAIN_CLASS, "appName", appArgs, pyFiles = pyFiles) - val kubernetesConf = KubernetesConf( - conf, - driverConf, - "resource-prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) + val kubernetesConf = KubernetesTestConf.createDriverConf( + sparkConf = conf, + mainAppResource = resource, + appArgs = appArgs, + pyFiles = pyFiles) val step = new DriverCommandFeatureStep(kubernetesConf) val pod = step.configurePod(SparkPod.initialPod()) val props = step.getAdditionalPodSystemProperties() http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index 36c6616..7d8e929 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -18,51 +18,25 @@ package org.apache.spark.deploy.k8s.features import java.io.File +import scala.collection.JavaConverters._ + import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, Files} -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, Secret} -import org.mockito.{Mock, MockitoAnnotations} -import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.Secret import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.util.Utils -class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { +class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite { - private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark" - private val APP_ID = "k8s-app" - private var credentialsTempDirectory: File = _ + private val credentialsTempDirectory = Utils.createTempDir() private val BASE_DRIVER_POD = SparkPod.initialPod() - @Mock - private var driverSpecificConf: KubernetesDriverSpecificConf = _ - - before { - MockitoAnnotations.initMocks(this) - credentialsTempDirectory = Utils.createTempDir() - } - - after { - credentialsTempDirectory.delete() - } - test("Don't set any credentials") { - val kubernetesConf = KubernetesConf( - new SparkConf(false), - driverSpecificConf, - KUBERNETES_RESOURCE_NAME_PREFIX, - APP_ID, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) + val kubernetesConf = KubernetesTestConf.createDriverConf() val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty) @@ -83,19 +57,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef .set( s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", "/mnt/secrets/my-ca.pem") - val kubernetesConf = KubernetesConf( - submissionSparkConf, - driverSpecificConf, - KUBERNETES_RESOURCE_NAME_PREFIX, - APP_ID, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) - + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = submissionSparkConf) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty) @@ -122,18 +84,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef .set( s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", caCertFile.getAbsolutePath) - val kubernetesConf = KubernetesConf( - submissionSparkConf, - driverSpecificConf, - KUBERNETES_RESOURCE_NAME_PREFIX, - APP_ID, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = submissionSparkConf) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() val expectedSparkConf = Map( @@ -153,7 +104,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef .head .asInstanceOf[Secret] assert(credentialsSecret.getMetadata.getName === - s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials") + s"${kubernetesConf.resourceNamePrefix}-kubernetes-credentials") val decodedSecretData = credentialsSecret.getData.asScala.map { data => (data._1, new String(BaseEncoding.base64().decode(data._2), Charsets.UTF_8)) } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 3c46667..0452789 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -16,24 +16,19 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.Service -import org.mockito.{Mock, MockitoAnnotations} -import org.mockito.Mockito.when -import org.scalatest.BeforeAndAfter import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.Service + import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource -import org.apache.spark.util.Clock - -class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { +import org.apache.spark.internal.config._ +import org.apache.spark.util.ManualClock - private val SHORT_RESOURCE_NAME_PREFIX = - "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH - - DriverServiceFeatureStep.DRIVER_SVC_POSTFIX.length) +class DriverServiceFeatureStepSuite extends SparkFunSuite { private val LONG_RESOURCE_NAME_PREFIX = "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH - @@ -42,34 +37,14 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { "label1key" -> "label1value", "label2key" -> "label2value") - @Mock - private var clock: Clock = _ - - private var sparkConf: SparkConf = _ - - before { - MockitoAnnotations.initMocks(this) - sparkConf = new SparkConf(false) - } - test("Headless service has a port for the driver RPC and the block manager.") { - sparkConf = sparkConf + val sparkConf = new SparkConf(false) .set("spark.driver.port", "9000") - .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) - val configurationStep = new DriverServiceFeatureStep( - KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(None), "main", "app", Seq.empty), - SHORT_RESOURCE_NAME_PREFIX, - "app-id", - DRIVER_LABELS, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None)) + .set(DRIVER_BLOCK_MANAGER_PORT, 8080) + val kconf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + labels = DRIVER_LABELS) + val configurationStep = new DriverServiceFeatureStep(kconf) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service]) @@ -80,50 +55,28 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { verifyService( 9000, 8080, - s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", + s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", driverService) } test("Hostname and ports are set according to the service name.") { - val configurationStep = new DriverServiceFeatureStep( - KubernetesConf( - sparkConf - .set("spark.driver.port", "9000") - .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) - .set(KUBERNETES_NAMESPACE, "my-namespace"), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), "main", "app", Seq.empty), - SHORT_RESOURCE_NAME_PREFIX, - "app-id", - DRIVER_LABELS, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None)) - val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + - DriverServiceFeatureStep.DRIVER_SVC_POSTFIX + val sparkConf = new SparkConf(false) + .set("spark.driver.port", "9000") + .set(DRIVER_BLOCK_MANAGER_PORT, 8080) + .set(KUBERNETES_NAMESPACE, "my-namespace") + val kconf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + labels = DRIVER_LABELS) + val configurationStep = new DriverServiceFeatureStep(kconf) + val expectedServiceName = kconf.resourceNamePrefix + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc" val additionalProps = configurationStep.getAdditionalPodSystemProperties() verifySparkConfHostNames(additionalProps, expectedHostName) } test("Ports should resolve to defaults in SparkConf and in the service.") { - val configurationStep = new DriverServiceFeatureStep( - KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(None), "main", "app", Seq.empty), - SHORT_RESOURCE_NAME_PREFIX, - "app-id", - DRIVER_LABELS, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None)) + val kconf = KubernetesTestConf.createDriverConf(labels = DRIVER_LABELS) + val configurationStep = new DriverServiceFeatureStep(kconf) val resolvedService = configurationStep .getAdditionalKubernetesResources() .head @@ -131,30 +84,23 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { verifyService( DEFAULT_DRIVER_PORT, DEFAULT_BLOCKMANAGER_PORT, - s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", + s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", resolvedService) val additionalProps = configurationStep.getAdditionalPodSystemProperties() assert(additionalProps("spark.driver.port") === DEFAULT_DRIVER_PORT.toString) - assert(additionalProps(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key) - === DEFAULT_BLOCKMANAGER_PORT.toString) + assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString) } test("Long prefixes should switch to using a generated name.") { - when(clock.getTimeMillis()).thenReturn(10000) + val clock = new ManualClock() + clock.setTime(10000) + val sparkConf = new SparkConf(false) + .set(KUBERNETES_NAMESPACE, "my-namespace") val configurationStep = new DriverServiceFeatureStep( - KubernetesConf( - sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), "main", "app", Seq.empty), - LONG_RESOURCE_NAME_PREFIX, - "app-id", - DRIVER_LABELS, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None), + KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), + labels = DRIVER_LABELS), clock) val driverService = configurationStep .getAdditionalKubernetesResources() @@ -168,56 +114,27 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("Disallow bind address and driver host to be set explicitly.") { - try { - new DriverServiceFeatureStep( - KubernetesConf( - sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), "main", "app", Seq.empty), - LONG_RESOURCE_NAME_PREFIX, - "app-id", - DRIVER_LABELS, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None), - clock) - fail("The driver bind address should not be allowed.") - } catch { - case e: Throwable => - assert(e.getMessage === - s"requirement failed: ${DriverServiceFeatureStep.DRIVER_BIND_ADDRESS_KEY} is" + - " not supported in Kubernetes mode, as the driver's bind address is managed" + - " and set to the driver pod's IP address.") + val sparkConf = new SparkConf(false) + .set(DRIVER_BIND_ADDRESS, "host") + .set("spark.app.name", LONG_RESOURCE_NAME_PREFIX) + val e1 = intercept[IllegalArgumentException] { + new DriverServiceFeatureStep(KubernetesTestConf.createDriverConf(sparkConf = sparkConf)) } - sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) - sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host") - try { - new DriverServiceFeatureStep( - KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(None), "main", "app", Seq.empty), - LONG_RESOURCE_NAME_PREFIX, - "app-id", - DRIVER_LABELS, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None), - clock) - fail("The driver host address should not be allowed.") - } catch { - case e: Throwable => - assert(e.getMessage === - s"requirement failed: ${DriverServiceFeatureStep.DRIVER_HOST_KEY} is" + - " not supported in Kubernetes mode, as the driver's hostname will be managed via" + - " a Kubernetes service.") + assert(e1.getMessage === + s"requirement failed: ${DriverServiceFeatureStep.DRIVER_BIND_ADDRESS_KEY} is" + + " not supported in Kubernetes mode, as the driver's bind address is managed" + + " and set to the driver pod's IP address.") + + sparkConf.remove(DRIVER_BIND_ADDRESS) + sparkConf.set(DRIVER_HOST_ADDRESS, "host") + + val e2 = intercept[IllegalArgumentException] { + new DriverServiceFeatureStep(KubernetesTestConf.createDriverConf(sparkConf = sparkConf)) } + assert(e2.getMessage === + s"requirement failed: ${DriverServiceFeatureStep.DRIVER_HOST_KEY} is" + + " not supported in Kubernetes mode, as the driver's hostname will be managed via" + + " a Kubernetes service.") } private def verifyService( @@ -227,7 +144,9 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { service: Service): Unit = { assert(service.getMetadata.getName === expectedServiceName) assert(service.getSpec.getClusterIP === "None") - assert(service.getSpec.getSelector.asScala === DRIVER_LABELS) + DRIVER_LABELS.foreach { case (k, v) => + assert(service.getSpec.getSelector.get(k) === v) + } assert(service.getSpec.getPorts.size() === 2) val driverServicePorts = service.getSpec.getPorts.asScala assert(driverServicePorts.head.getName === DRIVER_PORT_NAME) http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index 3d25307..0455526 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -16,12 +16,12 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.PodBuilder +import scala.collection.JavaConverters._ -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s._ -class EnvSecretsFeatureStepSuite extends SparkFunSuite{ +class EnvSecretsFeatureStepSuite extends SparkFunSuite { private val KEY_REF_NAME_FOO = "foo" private val KEY_REF_NAME_BAR = "bar" private val KEY_REF_KEY_FOO = "key_foo" @@ -34,28 +34,14 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ val envVarsToKeys = Map( ENV_NAME_BAR -> s"${KEY_REF_NAME_BAR}:${KEY_REF_KEY_BAR}", ENV_NAME_FOO -> s"${KEY_REF_NAME_FOO}:${KEY_REF_KEY_FOO}") - val sparkConf = new SparkConf(false) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesExecutorSpecificConf("1", Some(new PodBuilder().build())), - "resource-name-prefix", - "app-id", - Map.empty, - Map.empty, - Map.empty, - envVarsToKeys, - Map.empty, - Nil, - hadoopConfSpec = None) + val kubernetesConf = KubernetesTestConf.createDriverConf( + secretEnvNamesToKeyRefs = envVarsToKeys) val step = new EnvSecretsFeatureStep(kubernetesConf) - val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container - - val expectedVars = - Seq(s"${ENV_NAME_BAR}", s"${ENV_NAME_FOO}") - - expectedVars.foreach { envName => - assert(KubernetesFeaturesTestUtils.containerHasEnvVar(driverContainerWithEnvSecrets, envName)) + val container = step.configurePod(baseDriverPod).container + val containerEnvKeys = container.getEnv.asScala.map { v => v.getName }.toSet + envVarsToKeys.keys.foreach { envName => + assert(containerEnvKeys.contains(envName)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 894d824..8f34ce5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -17,45 +17,19 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder} -import org.mockito.Mockito -import org.scalatest._ -import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.submit.JavaMainAppResource +import org.apache.spark.util.SparkConfWithEnv -class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { +class LocalDirsFeatureStepSuite extends SparkFunSuite { private val defaultLocalDir = "/var/data/default-local-dir" - private var sparkConf: SparkConf = _ - private var kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf] = _ - - before { - val realSparkConf = new SparkConf(false) - sparkConf = Mockito.spy(realSparkConf) - kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "app-name", - "main", - Seq.empty), - "resource", - "app-id", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) - } test("Resolve to default local dir if neither env nor configuration are set") { - Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") - Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") - val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) + val stepUnderTest = new LocalDirsFeatureStep(KubernetesTestConf.createDriverConf(), + defaultLocalDir) val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod()) assert(configuredPod.pod.getSpec.getVolumes.size === 1) assert(configuredPod.pod.getSpec.getVolumes.get(0) === @@ -79,8 +53,9 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("Use configured local dirs split on comma if provided.") { - Mockito.doReturn("/var/data/my-local-dir-1,/var/data/my-local-dir-2") - .when(sparkConf).getenv("SPARK_LOCAL_DIRS") + val sparkConf = new SparkConfWithEnv(Map( + "SPARK_LOCAL_DIRS" -> "/var/data/my-local-dir-1,/var/data/my-local-dir-2")) + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod()) assert(configuredPod.pod.getSpec.getVolumes.size === 2) @@ -116,9 +91,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { } test("Use tmpfs to back default local dir") { - Mockito.doReturn(null).when(sparkConf).get("spark.local.dir") - Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS") - Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS) + val sparkConf = new SparkConf(false).set(KUBERNETES_LOCAL_DIRS_TMPFS, true) + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod()) assert(configuredPod.pod.getSpec.getVolumes.size === 1) http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index 1555f6a..22f6d26 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -16,10 +16,8 @@ */ package org.apache.spark.deploy.k8s.features -import io.fabric8.kubernetes.api.model.PodBuilder - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SecretVolumeUtils, SparkPod} +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{KubernetesTestConf, SecretVolumeUtils, SparkPod} class MountSecretsFeatureStepSuite extends SparkFunSuite { @@ -32,19 +30,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { val secretNamesToMountPaths = Map( SECRET_FOO -> SECRET_MOUNT_PATH, SECRET_BAR -> SECRET_MOUNT_PATH) - val sparkConf = new SparkConf(false) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesExecutorSpecificConf("1", Some(new PodBuilder().build())), - "resource-name-prefix", - "app-id", - Map.empty, - Map.empty, - secretNamesToMountPaths, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) + val kubernetesConf = KubernetesTestConf.createExecutorConf( + secretNamesToMountPaths = secretNamesToMountPaths) val step = new MountSecretsFeatureStep(kubernetesConf) val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index aadbf16..e6f1dd6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -16,29 +16,12 @@ */ package org.apache.spark.deploy.k8s.features +import scala.collection.JavaConverters._ + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class MountVolumesFeatureStepSuite extends SparkFunSuite { - private val sparkConf = new SparkConf(false) - private val emptyKubernetesConf = KubernetesConf( - sparkConf = sparkConf, - roleSpecificConf = KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "app-name", - "main", - Seq.empty), - appResourceNamePrefix = "resource", - appId = "app-id", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - hadoopConfSpec = None) - test("Mounts hostPath volumes") { val volumeConf = KubernetesVolumeSpec( "testVolume", @@ -47,7 +30,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { false, KubernetesHostPathVolumeConf("/hostPath/tmp") ) - val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) @@ -67,7 +50,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { true, KubernetesPVCVolumeConf("pvcClaim") ) - val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) @@ -89,7 +72,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { false, KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G")) ) - val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) @@ -111,7 +94,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { false, KubernetesEmptyDirVolumeConf(None, None) ) - val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) @@ -140,8 +123,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { true, KubernetesPVCVolumeConf("pvcClaim") ) - val volumesConf = hpVolumeConf :: pvcVolumeConf :: Nil - val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumesConf) + val kubernetesConf = KubernetesTestConf.createDriverConf( + volumes = Seq(hpVolumeConf, pvcVolumeConf)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) @@ -157,7 +140,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { false, KubernetesEmptyDirVolumeConf(None, None) ) - val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) @@ -176,7 +159,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { true, KubernetesPVCVolumeConf("pvcClaim") ) - val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) @@ -206,19 +189,18 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { true, KubernetesEmptyDirVolumeConf(None, None) ) - val kubernetesConf = emptyKubernetesConf.copy( - roleVolumes = emptyDirSpec :: pvcSpec :: Nil) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(emptyDirSpec, pvcSpec)) val step = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = step.configurePod(SparkPod.initialPod()) assert(configuredPod.pod.getSpec.getVolumes.size() === 2) - val mounts = configuredPod.container.getVolumeMounts - assert(mounts.size() === 2) - assert(mounts.get(0).getName === "testEmptyDir") - assert(mounts.get(0).getMountPath === "/tmp/foo") - assert(mounts.get(0).getSubPath === "foo") - assert(mounts.get(1).getName === "testPVC") - assert(mounts.get(1).getMountPath === "/tmp/bar") - assert(mounts.get(1).getSubPath === "bar") + val mounts = configuredPod.container.getVolumeMounts.asScala.sortBy(_.getName()) + assert(mounts.size === 2) + assert(mounts(0).getName === "testEmptyDir") + assert(mounts(0).getMountPath === "/tmp/foo") + assert(mounts(0).getSubPath === "foo") + assert(mounts(1).getName === "testPVC") + assert(mounts(1).getMountPath === "/tmp/bar") + assert(mounts(1).getSubPath === "bar") } } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index 370948c..7295b82 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -20,40 +20,22 @@ import java.io.{File, PrintWriter} import java.nio.file.Files import io.fabric8.kubernetes.api.model.ConfigMap -import org.mockito.Mockito import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { - private var sparkConf: SparkConf = _ - private var kubernetesConf : KubernetesConf[_ <: KubernetesRoleSpecificConf] = _ + private var kubernetesConf : KubernetesConf = _ private var templateFile: File = _ before { - sparkConf = Mockito.mock(classOf[SparkConf]) - kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "app-name", - "main", - Seq.empty), - "resource", - "app-id", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Option.empty) templateFile = Files.createTempFile("pod-template", "yml").toFile templateFile.deleteOnExit() - Mockito.doReturn(Option(templateFile.getAbsolutePath)).when(sparkConf) - .get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + + val sparkConf = new SparkConf(false) + .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, templateFile.getAbsolutePath) + kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) } test("Mounts executor template volume if config specified") { http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 08f2875..e9c05fe 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -24,8 +24,8 @@ import org.mockito.Mockito.{doReturn, verify, when} import org.scalatest.BeforeAndAfter import org.scalatest.mockito.MockitoSugar._ -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod} +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ @@ -37,10 +37,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { private val KUBERNETES_RESOURCE_PREFIX = "resource-example" private val POD_NAME = "driver" private val CONTAINER_NAME = "container" - private val APP_ID = "app-id" - private val APP_NAME = "app" - private val MAIN_CLASS = "main" - private val APP_ARGS = Seq("arg1", "arg2") private val RESOLVED_JAVA_OPTIONS = Map( "conf1key" -> "conf1value", "conf2key" -> "conf2value") @@ -122,28 +118,15 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var resourceList: RESOURCE_LIST = _ - private var kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf] = _ - - private var sparkConf: SparkConf = _ + private var kconf: KubernetesDriverConf = _ private var createdPodArgumentCaptor: ArgumentCaptor[Pod] = _ private var createdResourcesArgumentCaptor: ArgumentCaptor[HasMetadata] = _ before { MockitoAnnotations.initMocks(this) - sparkConf = new SparkConf(false) - kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf]( - sparkConf, - KubernetesDriverSpecificConf(JavaMainAppResource(None), MAIN_CLASS, APP_NAME, APP_ARGS), - KUBERNETES_RESOURCE_PREFIX, - APP_ID, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) - when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) + kconf = KubernetesTestConf.createDriverConf( + resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)) + when(driverBuilder.buildFromFeatures(kconf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(POD_NAME)).thenReturn(namedPods) @@ -158,26 +141,22 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { test("The client should configure the pod using the builder.") { val submissionClient = new Client( + kconf, driverBuilder, - kubernetesConf, kubernetesClient, false, - "spark", - loggingPodStatusWatcher, - KUBERNETES_RESOURCE_PREFIX) + loggingPodStatusWatcher) submissionClient.run() verify(podOperations).create(FULL_EXPECTED_POD) } test("The client should create Kubernetes resources") { val submissionClient = new Client( + kconf, driverBuilder, - kubernetesConf, kubernetesClient, false, - "spark", - loggingPodStatusWatcher, - KUBERNETES_RESOURCE_PREFIX) + loggingPodStatusWatcher) submissionClient.run() val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues assert(otherCreatedResources.size === 2) @@ -197,13 +176,11 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { test("Waiting for app completion should stall on the watcher") { val submissionClient = new Client( + kconf, driverBuilder, - kubernetesConf, kubernetesClient, true, - "spark", - loggingPodStatusWatcher, - KUBERNETES_RESOURCE_PREFIX) + loggingPodStatusWatcher) submissionClient.run() verify(loggingPodStatusWatcher).awaitCompletion() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org