Repository: spark Updated Branches: refs/heads/master 0820484ba -> 6c9c84ffb
http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala index c14af1d..2bcc646 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala @@ -53,7 +53,8 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Seq.empty[String]) + sparkFiles = Seq.empty[String], + hadoopConfSpec = None) val step = new PythonDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod @@ -90,7 +91,8 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Seq.empty[String]) + sparkFiles = Seq.empty[String], + hadoopConfSpec = None) val step = new PythonDriverFeatureStep(kubernetesConf) val driverContainerwithPySpark = step.configurePod(baseDriverPod).container val args = driverContainerwithPySpark http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala index ace0faa..17af601 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala @@ -47,7 +47,8 @@ class RDriverFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Seq.empty, - sparkFiles = Seq.empty[String]) + sparkFiles = Seq.empty[String], + hadoopConfSpec = None) val step = new RDriverFeatureStep(kubernetesConf) val driverContainerwithR = step.configurePod(baseDriverPod).container http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 4d8e791..ae13df3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -142,7 +142,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(POD_NAME)).thenReturn(namedPods) http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 4117c54..051d7b6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep} import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -30,9 +29,10 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val LOCAL_DIRS_STEP_TYPE = "local-dirs" private val SECRETS_STEP_TYPE = "mount-secrets" private val JAVA_STEP_TYPE = "java-bindings" - private val PYSPARK_STEP_TYPE = "pyspark-bindings" private val R_STEP_TYPE = "r-bindings" + private val PYSPARK_STEP_TYPE = "pyspark-bindings" private val ENV_SECRETS_STEP_TYPE = "env-secrets" + private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( @@ -62,6 +62,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) + private val hadoopGlobalStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + HADOOP_GLOBAL_STEP_TYPE, classOf[KerberosConfDriverFeatureStep]) + private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) @@ -76,7 +79,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => mountVolumesStep, _ => pythonStep, _ => rStep, - _ => javaStep) + _ => javaStep, + _ => hadoopGlobalStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -94,7 +98,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -120,7 +125,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map("EnvName" -> "SecretName:secretKey"), Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -148,7 +154,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -174,7 +181,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -205,7 +213,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -232,7 +241,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -242,8 +252,71 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { R_STEP_TYPE) } + test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + None, + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + hadoopConfSpec = Some( + HadoopConfSpec( + Some("/var/hadoop-conf"), + None))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) + } + + test("Apply HadoopSteps if HADOOP_CONF ConfigMap is defined.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + None, + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + hadoopConfSpec = Some( + HadoopConfSpec( + None, + Some("pre-defined-configMapName")))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) + } + + private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) - : Unit = { + : Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) stepTypes.foreach { stepType => assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType) http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 0e617b0..b336774 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -162,6 +162,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { } else { val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] val executorSpecificConf = k8sConf.roleSpecificConf + // TODO: HADOOP_CONF_DIR val expectedK8sConf = KubernetesConf.createExecutorConf( conf, executorSpecificConf.executorId, http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 44fe4a2..b572dac 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ class KubernetesExecutorBuilderSuite extends SparkFunSuite { @@ -27,6 +28,9 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val SECRETS_STEP_TYPE = "mount-secrets" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val LOCAL_DIRS_STEP_TYPE = "local-dirs" + private val HADOOP_CONF_STEP_TYPE = "hadoop-conf-step" + private val HADOOP_SPARK_USER_STEP_TYPE = "hadoop-spark-user" + private val KERBEROS_CONF_STEP_TYPE = "kerberos-step" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( @@ -37,6 +41,12 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep]) + private val hadoopConfStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + HADOOP_CONF_STEP_TYPE, classOf[HadoopConfExecutorFeatureStep]) + private val hadoopSparkUser = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + HADOOP_SPARK_USER_STEP_TYPE, classOf[HadoopSparkUserExecutorFeatureStep]) + private val kerberosConf = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + KERBEROS_CONF_STEP_TYPE, classOf[KerberosConfExecutorFeatureStep]) private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) @@ -45,7 +55,10 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { _ => mountSecretsStep, _ => envSecretsStep, _ => localDirsStep, - _ => mountVolumesStep) + _ => mountVolumesStep, + _ => hadoopConfStep, + _ => kerberosConf, + _ => hadoopSparkUser) test("Basic steps are consistently applied.") { val conf = KubernetesConf( @@ -60,7 +73,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) } @@ -78,7 +92,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map("secret-name" -> "secret-key"), Map.empty, Nil, - Seq.empty[String]) + Seq.empty[String], + None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -105,7 +120,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String]) + Seq.empty[String], + None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -113,6 +129,64 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { MOUNT_VOLUMES_STEP_TYPE) } + test("Apply basicHadoop step if HADOOP_CONF_DIR is defined") { + // HADOOP_DELEGATION_TOKEN + val HADOOP_CREDS_PREFIX = "spark.security.credentials." + val HADOOPFS_PROVIDER = s"$HADOOP_CREDS_PREFIX.hadoopfs.enabled" + val conf = KubernetesConf( + new SparkConf(false) + .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") + .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name") + .set(KERBEROS_SPARK_USER_NAME, "spark-user") + .set(HADOOPFS_PROVIDER, "true"), + KubernetesExecutorSpecificConf( + "executor-id", Some(new PodBuilder().build())), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Some(HadoopConfSpec(Some("/var/hadoop-conf"), None))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + HADOOP_CONF_STEP_TYPE, + HADOOP_SPARK_USER_STEP_TYPE) + } + + test("Apply kerberos step if DT secrets created") { + val conf = KubernetesConf( + new SparkConf(false) + .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") + .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name") + .set(KERBEROS_SPARK_USER_NAME, "spark-user") + .set(KERBEROS_DT_SECRET_NAME, "dt-secret") + .set(KERBEROS_DT_SECRET_KEY, "dt-key"), + KubernetesExecutorSpecificConf( + "executor-id", Some(new PodBuilder().build())), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Some(HadoopConfSpec(None, Some("pre-defined-onfigMapName")))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + HADOOP_CONF_STEP_TYPE, + KERBEROS_CONF_STEP_TYPE) + } + private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = { assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size) stepTypes.foreach { stepType => http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 1c4dcd5..4bada0d 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -30,7 +30,7 @@ ARG k8s_tests=kubernetes/tests RUN set -ex && \ apk upgrade --no-cache && \ - apk add --no-cache bash tini libc6-compat linux-pam && \ + apk add --no-cache bash tini libc6-compat linux-pam krb5 krb5-libs && \ mkdir -p /opt/spark && \ mkdir -p /opt/spark/work-dir && \ touch /opt/spark/RELEASE && \ http://git-wip-us.apache.org/repos/asf/spark/blob/6c9c84ff/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 216e8fe..4958b73 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -83,6 +83,10 @@ elif [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then export PYSPARK_DRIVER_PYTHON="python3" fi +if ! [ -z ${HADOOP_CONF_DIR+x} ]; then + SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH"; +fi + case "$SPARK_K8S_CMD" in driver) CMD=( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
