Repository: spark Updated Branches: refs/heads/master 4f8dc6b01 -> 5415963d2
[SPARK-22131][MESOS] Mesos driver secrets ## Background In #18837 , ArtRand added Mesos secrets support to the dispatcher. **This PR is to add the same secrets support to the drivers.** This means if the secret configs are set, the driver will launch executors that have access to either env or file-based secrets. One use case for this is to support TLS in the driver <=> executor communication. ## What changes were proposed in this pull request? Most of the changes are a refactor of the dispatcher secrets support (#18837) - moving it to a common place that can be used by both the dispatcher and drivers. The same goes for the unit tests. ## How was this patch tested? There are four config combinations: [env or file-based] x [value or reference secret]. For each combination: - Added a unit test. - Tested in DC/OS. Author: Susan X. Huynh <xhu...@mesosphere.com> Closes #19437 from susanxhuynh/sh-mesos-driver-secret. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5415963d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5415963d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5415963d Branch: refs/heads/master Commit: 5415963d2caaf95604211419ffc4e29fff38e1d7 Parents: 4f8dc6b Author: Susan X. Huynh <xhu...@mesosphere.com> Authored: Thu Oct 26 16:13:48 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Oct 26 16:13:48 2017 -0700 ---------------------------------------------------------------------- docs/running-on-mesos.md | 111 +++++++++++--- .../org/apache/spark/deploy/mesos/config.scala | 64 ++++---- .../cluster/mesos/MesosClusterScheduler.scala | 138 ++++------------- .../MesosCoarseGrainedSchedulerBackend.scala | 31 +++- .../MesosFineGrainedSchedulerBackend.scala | 4 +- .../mesos/MesosSchedulerBackendUtil.scala | 92 +++++++++++- .../mesos/MesosClusterSchedulerSuite.scala | 150 +++---------------- ...esosCoarseGrainedSchedulerBackendSuite.scala | 34 ++++- .../mesos/MesosSchedulerBackendUtilSuite.scala | 7 +- .../spark/scheduler/cluster/mesos/Utils.scala | 107 +++++++++++++ 10 files changed, 434 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index e0944bc..b7e3e64 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -485,39 +485,106 @@ See the [configuration page](configuration.html) for information on Spark config </tr> <tr> - <td><code>spark.mesos.driver.secret.envkeys</code></td> - <td><code>(none)</code></td> <td> - A comma-separated list that, if set, the contents of the secret referenced - by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be - set to the provided environment variable in the driver's process. + <code>spark.mesos.driver.secret.values</code>, + <code>spark.mesos.driver.secret.names</code>, + <code>spark.mesos.executor.secret.values</code>, + <code>spark.mesos.executor.secret.names</code>, </td> - </tr> - <tr> -<td><code>spark.mesos.driver.secret.filenames</code></td> <td><code>(none)</code></td> <td> - A comma-separated list that, if set, the contents of the secret referenced by - spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be - written to the provided file. Paths are relative to the container's work - directory. Absolute paths must already exist. Consult the Mesos Secret - protobuf for more information. + <p> + A secret is specified by its contents and destination. These properties + specify a secret's contents. To specify a secret's destination, see the cell below. + </p> + <p> + You can specify a secret's contents either (1) by value or (2) by reference. + </p> + <p> + (1) To specify a secret by value, set the + <code>spark.mesos.[driver|executor].secret.values</code> + property, to make the secret available in the driver or executors. + For example, to make a secret password "guessme" available to the driver process, set: + + <pre>spark.mesos.driver.secret.values=guessme</pre> + </p> + <p> + (2) To specify a secret that has been placed in a secret store + by reference, specify its name within the secret store + by setting the <code>spark.mesos.[driver|executor].secret.names</code> + property. For example, to make a secret password named "password" in a secret store + available to the driver process, set: + + <pre>spark.mesos.driver.secret.names=password</pre> + </p> + <p> + Note: To use a secret store, make sure one has been integrated with Mesos via a custom + <a href="http://mesos.apache.org/documentation/latest/secrets/">SecretResolver + module</a>. + </p> + <p> + To specify multiple secrets, provide a comma-separated list: + + <pre>spark.mesos.driver.secret.values=guessme,passwd123</pre> + + or + + <pre>spark.mesos.driver.secret.names=password1,password2</pre> + </p> </td> </tr> + <tr> - <td><code>spark.mesos.driver.secret.names</code></td> - <td><code>(none)</code></td> <td> - A comma-separated list of secret references. Consult the Mesos Secret - protobuf for more information. + <code>spark.mesos.driver.secret.envkeys</code>, + <code>spark.mesos.driver.secret.filenames</code>, + <code>spark.mesos.executor.secret.envkeys</code>, + <code>spark.mesos.executor.secret.filenames</code>, </td> -</tr> -<tr> - <td><code>spark.mesos.driver.secret.values</code></td> <td><code>(none)</code></td> <td> - A comma-separated list of secret values. Consult the Mesos Secret - protobuf for more information. + <p> + A secret is specified by its contents and destination. These properties + specify a secret's destination. To specify a secret's contents, see the cell above. + </p> + <p> + You can specify a secret's destination in the driver or + executors as either (1) an environment variable or (2) as a file. + </p> + <p> + (1) To make an environment-based secret, set the + <code>spark.mesos.[driver|executor].secret.envkeys</code> property. + The secret will appear as an environment variable with the + given name in the driver or executors. For example, to make a secret password available + to the driver process as $PASSWORD, set: + + <pre>spark.mesos.driver.secret.envkeys=PASSWORD</pre> + </p> + <p> + (2) To make a file-based secret, set the + <code>spark.mesos.[driver|executor].secret.filenames</code> property. + The secret will appear in the contents of a file with the given file name in + the driver or executors. For example, to make a secret password available in a + file named "pwdfile" in the driver process, set: + + <pre>spark.mesos.driver.secret.filenames=pwdfile</pre> + </p> + <p> + Paths are relative to the container's work directory. Absolute paths must + already exist. Note: File-based secrets require a custom + <a href="http://mesos.apache.org/documentation/latest/secrets/">SecretResolver + module</a>. + </p> + <p> + To specify env vars or file names corresponding to multiple secrets, + provide a comma-separated list: + + <pre>spark.mesos.driver.secret.envkeys=PASSWORD1,PASSWORD2</pre> + + or + + <pre>spark.mesos.driver.secret.filenames=pwdfile1,pwdfile2</pre> + </p> </td> </tr> http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 7e85de9..821534e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -23,6 +23,39 @@ import org.apache.spark.internal.config.ConfigBuilder package object config { + private[spark] class MesosSecretConfig private[config](taskType: String) { + private[spark] val SECRET_NAMES = + ConfigBuilder(s"spark.mesos.$taskType.secret.names") + .doc("A comma-separated list of secret reference names. Consult the Mesos Secret " + + "protobuf for more information.") + .stringConf + .toSequence + .createOptional + + private[spark] val SECRET_VALUES = + ConfigBuilder(s"spark.mesos.$taskType.secret.values") + .doc("A comma-separated list of secret values.") + .stringConf + .toSequence + .createOptional + + private[spark] val SECRET_ENVKEYS = + ConfigBuilder(s"spark.mesos.$taskType.secret.envkeys") + .doc("A comma-separated list of the environment variables to contain the secrets." + + "The environment variable will be set on the driver.") + .stringConf + .toSequence + .createOptional + + private[spark] val SECRET_FILENAMES = + ConfigBuilder(s"spark.mesos.$taskType.secret.filenames") + .doc("A comma-separated list of file paths secret will be written to. Consult the Mesos " + + "Secret protobuf for more information.") + .stringConf + .toSequence + .createOptional + } + /* Common app configuration. */ private[spark] val SHUFFLE_CLEANER_INTERVAL_S = @@ -64,36 +97,9 @@ package object config { .stringConf .createOptional - private[spark] val SECRET_NAME = - ConfigBuilder("spark.mesos.driver.secret.names") - .doc("A comma-separated list of secret reference names. Consult the Mesos Secret protobuf " + - "for more information.") - .stringConf - .toSequence - .createOptional - - private[spark] val SECRET_VALUE = - ConfigBuilder("spark.mesos.driver.secret.values") - .doc("A comma-separated list of secret values.") - .stringConf - .toSequence - .createOptional + private[spark] val driverSecretConfig = new MesosSecretConfig("driver") - private[spark] val SECRET_ENVKEY = - ConfigBuilder("spark.mesos.driver.secret.envkeys") - .doc("A comma-separated list of the environment variables to contain the secrets." + - "The environment variable will be set on the driver.") - .stringConf - .toSequence - .createOptional - - private[spark] val SECRET_FILENAME = - ConfigBuilder("spark.mesos.driver.secret.filenames") - .doc("A comma-seperated list of file paths secret will be written to. Consult the Mesos " + - "Secret protobuf for more information.") - .stringConf - .toSequence - .createOptional + private[spark] val executorSecretConfig = new MesosSecretConfig("executor") private[spark] val DRIVER_FAILOVER_TIMEOUT = ConfigBuilder("spark.mesos.driver.failoverTimeout") http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index ec533f9..8247026 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -28,7 +28,6 @@ import org.apache.mesos.{Scheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.Environment.Variable import org.apache.mesos.Protos.TaskStatus.Reason -import org.apache.mesos.protobuf.ByteString import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} import org.apache.spark.deploy.mesos.MesosDriverDescription @@ -394,39 +393,20 @@ private[spark] class MesosClusterScheduler( } // add secret environment variables - getSecretEnvVar(desc).foreach { variable => - if (variable.getSecret.getReference.isInitialized) { - logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName}" + - s"on file ${variable.getName}") - } else { - logInfo(s"Setting secret on environment variable name=${variable.getName}") - } - envBuilder.addVariables(variable) + MesosSchedulerBackendUtil.getSecretEnvVar(desc.conf, config.driverSecretConfig) + .foreach { variable => + if (variable.getSecret.getReference.isInitialized) { + logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName} " + + s"on file ${variable.getName}") + } else { + logInfo(s"Setting secret on environment variable name=${variable.getName}") + } + envBuilder.addVariables(variable) } envBuilder.build() } - private def getSecretEnvVar(desc: MesosDriverDescription): List[Variable] = { - val secrets = getSecrets(desc) - val secretEnvKeys = desc.conf.get(config.SECRET_ENVKEY).getOrElse(Nil) - if (illegalSecretInput(secretEnvKeys, secrets)) { - throw new SparkException( - s"Need to give equal numbers of secrets and environment keys " + - s"for environment-based reference secrets got secrets $secrets, " + - s"and keys $secretEnvKeys") - } - - secrets.zip(secretEnvKeys).map { - case (s, k) => - Variable.newBuilder() - .setName(k) - .setType(Variable.Type.SECRET) - .setSecret(s) - .build - }.toList - } - private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = { val confUris = List(conf.getOption("spark.mesos.uris"), desc.conf.getOption("spark.mesos.uris"), @@ -440,6 +420,23 @@ private[spark] class MesosClusterScheduler( CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build()) } + private def getContainerInfo(desc: MesosDriverDescription): ContainerInfo.Builder = { + val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(desc.conf) + + MesosSchedulerBackendUtil.getSecretVolume(desc.conf, config.driverSecretConfig) + .foreach { volume => + if (volume.getSource.getSecret.getReference.isInitialized) { + logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName} " + + s"on file ${volume.getContainerPath}") + } else { + logInfo(s"Setting secret on file name=${volume.getContainerPath}") + } + containerInfo.addVolumes(volume) + } + + containerInfo + } + private def getDriverCommandValue(desc: MesosDriverDescription): String = { val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image") val executorUri = getDriverExecutorURI(desc) @@ -579,89 +576,6 @@ private[spark] class MesosClusterScheduler( .build } - private def getContainerInfo(desc: MesosDriverDescription): ContainerInfo.Builder = { - val containerInfo = MesosSchedulerBackendUtil.containerInfo(desc.conf) - - getSecretVolume(desc).foreach { volume => - if (volume.getSource.getSecret.getReference.isInitialized) { - logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName}" + - s"on file ${volume.getContainerPath}") - } else { - logInfo(s"Setting secret on file name=${volume.getContainerPath}") - } - containerInfo.addVolumes(volume) - } - - containerInfo - } - - - private def getSecrets(desc: MesosDriverDescription): Seq[Secret] = { - def createValueSecret(data: String): Secret = { - Secret.newBuilder() - .setType(Secret.Type.VALUE) - .setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes))) - .build() - } - - def createReferenceSecret(name: String): Secret = { - Secret.newBuilder() - .setReference(Secret.Reference.newBuilder().setName(name)) - .setType(Secret.Type.REFERENCE) - .build() - } - - val referenceSecrets: Seq[Secret] = - desc.conf.get(config.SECRET_NAME).getOrElse(Nil).map(s => createReferenceSecret(s)) - - val valueSecrets: Seq[Secret] = { - desc.conf.get(config.SECRET_VALUE).getOrElse(Nil).map(s => createValueSecret(s)) - } - - if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) { - throw new SparkException("Cannot specify VALUE type secrets and REFERENCE types ones") - } - - if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets - } - - private def illegalSecretInput(dest: Seq[String], s: Seq[Secret]): Boolean = { - if (dest.isEmpty) { // no destination set (ie not using secrets of this type - return false - } - if (dest.nonEmpty && s.nonEmpty) { - // make sure there is a destination for each secret of this type - if (dest.length != s.length) { - return true - } - } - false - } - - private def getSecretVolume(desc: MesosDriverDescription): List[Volume] = { - val secrets = getSecrets(desc) - val secretPaths: Seq[String] = - desc.conf.get(config.SECRET_FILENAME).getOrElse(Nil) - - if (illegalSecretInput(secretPaths, secrets)) { - throw new SparkException( - s"Need to give equal numbers of secrets and file paths for file-based " + - s"reference secrets got secrets $secrets, and paths $secretPaths") - } - - secrets.zip(secretPaths).map { - case (s, p) => - val source = Volume.Source.newBuilder() - .setType(Volume.Source.Type.SECRET) - .setSecret(s) - Volume.newBuilder() - .setContainerPath(p) - .setSource(source) - .setMode(Volume.Mode.RO) - .build - }.toList - } - /** * This method takes all the possible candidates and attempt to schedule them with Mesos offers. * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 603c980..104ed01 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -28,7 +28,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future -import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config @@ -244,6 +244,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .setValue(value) .build()) } + + MesosSchedulerBackendUtil.getSecretEnvVar(conf, executorSecretConfig).foreach { variable => + if (variable.getSecret.getReference.isInitialized) { + logInfo(s"Setting reference secret ${variable.getSecret.getReference.getName} " + + s"on file ${variable.getName}") + } else { + logInfo(s"Setting secret on environment variable name=${variable.getName}") + } + environment.addVariables(variable) + } + val command = CommandInfo.newBuilder() .setEnvironment(environment) @@ -424,6 +435,22 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } } + private def getContainerInfo(conf: SparkConf): ContainerInfo.Builder = { + val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf) + + MesosSchedulerBackendUtil.getSecretVolume(conf, executorSecretConfig).foreach { volume => + if (volume.getSource.getSecret.getReference.isInitialized) { + logInfo(s"Setting reference secret ${volume.getSource.getSecret.getReference.getName} " + + s"on file ${volume.getContainerPath}") + } else { + logInfo(s"Setting secret on file name=${volume.getContainerPath}") + } + containerInfo.addVolumes(volume) + } + + containerInfo + } + /** * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize * per-task memory and IO, tasks are round-robin assigned to offers. @@ -475,7 +502,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .setName(s"${sc.appName} $taskId") .setLabels(MesosProtoUtils.mesosLabels(taskLabels)) .addAllResources(resourcesToUse.asJava) - .setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) + .setContainer(getContainerInfo(sc.conf)) tasks(offer.getId) ::= taskBuilder.build() remainingResources(offerId) = resourcesLeft.asJava http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 66b8e0a..d6d939d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -28,6 +28,7 @@ import org.apache.mesos.SchedulerDriver import org.apache.mesos.protobuf.ByteString import org.apache.spark.{SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.mesos.config import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -159,7 +160,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - executorInfo.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) + executorInfo.setContainer( + MesosSchedulerBackendUtil.buildContainerInfo(sc.conf)) (executorInfo.build(), resourcesAfterMem.asJava) } http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index f29e541..bfb7361 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -17,11 +17,15 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume} +import org.apache.mesos.Protos.{ContainerInfo, Environment, Image, NetworkInfo, Parameter, Secret, Volume} import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo} +import org.apache.mesos.Protos.Environment.Variable +import org.apache.mesos.protobuf.ByteString -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkConf +import org.apache.spark.SparkException import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME} +import org.apache.spark.deploy.mesos.config.MesosSecretConfig import org.apache.spark.internal.Logging /** @@ -122,7 +126,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .toList } - def containerInfo(conf: SparkConf): ContainerInfo.Builder = { + def buildContainerInfo(conf: SparkConf): ContainerInfo.Builder = { val containerType = if (conf.contains("spark.mesos.executor.docker.image") && conf.get("spark.mesos.containerizer", "docker") == "docker") { ContainerInfo.Type.DOCKER @@ -173,6 +177,88 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { containerInfo } + private def getSecrets(conf: SparkConf, secretConfig: MesosSecretConfig): Seq[Secret] = { + def createValueSecret(data: String): Secret = { + Secret.newBuilder() + .setType(Secret.Type.VALUE) + .setValue(Secret.Value.newBuilder().setData(ByteString.copyFrom(data.getBytes))) + .build() + } + + def createReferenceSecret(name: String): Secret = { + Secret.newBuilder() + .setReference(Secret.Reference.newBuilder().setName(name)) + .setType(Secret.Type.REFERENCE) + .build() + } + + val referenceSecrets: Seq[Secret] = + conf.get(secretConfig.SECRET_NAMES).getOrElse(Nil).map { s => createReferenceSecret(s) } + + val valueSecrets: Seq[Secret] = { + conf.get(secretConfig.SECRET_VALUES).getOrElse(Nil).map { s => createValueSecret(s) } + } + + if (valueSecrets.nonEmpty && referenceSecrets.nonEmpty) { + throw new SparkException("Cannot specify both value-type and reference-type secrets.") + } + + if (referenceSecrets.nonEmpty) referenceSecrets else valueSecrets + } + + private def illegalSecretInput(dest: Seq[String], secrets: Seq[Secret]): Boolean = { + if (dest.nonEmpty) { + // make sure there is a one-to-one correspondence between destinations and secrets + if (dest.length != secrets.length) { + return true + } + } + false + } + + def getSecretVolume(conf: SparkConf, secretConfig: MesosSecretConfig): List[Volume] = { + val secrets = getSecrets(conf, secretConfig) + val secretPaths: Seq[String] = + conf.get(secretConfig.SECRET_FILENAMES).getOrElse(Nil) + + if (illegalSecretInput(secretPaths, secrets)) { + throw new SparkException( + s"Need to give equal numbers of secrets and file paths for file-based " + + s"reference secrets got secrets $secrets, and paths $secretPaths") + } + + secrets.zip(secretPaths).map { case (s, p) => + val source = Volume.Source.newBuilder() + .setType(Volume.Source.Type.SECRET) + .setSecret(s) + Volume.newBuilder() + .setContainerPath(p) + .setSource(source) + .setMode(Volume.Mode.RO) + .build + }.toList + } + + def getSecretEnvVar(conf: SparkConf, secretConfig: MesosSecretConfig): + List[Variable] = { + val secrets = getSecrets(conf, secretConfig) + val secretEnvKeys = conf.get(secretConfig.SECRET_ENVKEYS).getOrElse(Nil) + if (illegalSecretInput(secretEnvKeys, secrets)) { + throw new SparkException( + s"Need to give equal numbers of secrets and environment keys " + + s"for environment-based reference secrets got secrets $secrets, " + + s"and keys $secretEnvKeys") + } + + secrets.zip(secretEnvKeys).map { case (s, k) => + Variable.newBuilder() + .setName(k) + .setType(Variable.Type.SECRET) + .setSecret(s) + .build + }.toList + } + private def dockerInfo( image: String, forcePullImage: Boolean, http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index ff63e3f..77acee6 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import org.apache.mesos.Protos.{Environment, Secret, TaskState => MesosTaskState, _} import org.apache.mesos.Protos.Value.{Scalar, Type} import org.apache.mesos.SchedulerDriver -import org.apache.mesos.protobuf.ByteString import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Mockito._ import org.scalatest.mockito.MockitoSugar @@ -32,6 +31,7 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} import org.apache.spark.deploy.Command import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.deploy.mesos.config class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { @@ -341,132 +341,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi } test("Creates an env-based reference secrets.") { - setScheduler() - - val mem = 1000 - val cpu = 1 - val secretName = "/path/to/secret,/anothersecret" - val envKey = "SECRET_ENV_KEY,PASSWORD" - val driverDesc = new MesosDriverDescription( - "d1", - "jar", - mem, - cpu, - true, - command, - Map("spark.mesos.executor.home" -> "test", - "spark.app.name" -> "test", - "spark.mesos.driver.secret.names" -> secretName, - "spark.mesos.driver.secret.envkeys" -> envKey), - "s1", - new Date()) - val response = scheduler.submitDriver(driverDesc) - assert(response.success) - val offer = Utils.createOffer("o1", "s1", mem, cpu) - scheduler.resourceOffers(driver, Collections.singletonList(offer)) - val launchedTasks = Utils.verifyTaskLaunched(driver, "o1") - assert(launchedTasks.head - .getCommand - .getEnvironment - .getVariablesCount == 3) // SPARK_SUBMIT_OPS and the secret - val variableOne = launchedTasks.head.getCommand.getEnvironment - .getVariablesList.asScala.filter(_.getName == "SECRET_ENV_KEY").head - assert(variableOne.getSecret.isInitialized) - assert(variableOne.getSecret.getType == Secret.Type.REFERENCE) - assert(variableOne.getSecret.getReference.getName == "/path/to/secret") - assert(variableOne.getType == Environment.Variable.Type.SECRET) - val variableTwo = launchedTasks.head.getCommand.getEnvironment - .getVariablesList.asScala.filter(_.getName == "PASSWORD").head - assert(variableTwo.getSecret.isInitialized) - assert(variableTwo.getSecret.getType == Secret.Type.REFERENCE) - assert(variableTwo.getSecret.getReference.getName == "/anothersecret") - assert(variableTwo.getType == Environment.Variable.Type.SECRET) + val launchedTasks = launchDriverTask( + Utils.configEnvBasedRefSecrets(config.driverSecretConfig)) + Utils.verifyEnvBasedRefSecrets(launchedTasks) } test("Creates an env-based value secrets.") { - setScheduler() - val mem = 1000 - val cpu = 1 - val secretValues = "user,password" - val envKeys = "USER,PASSWORD" - val driverDesc = new MesosDriverDescription( - "d1", - "jar", - mem, - cpu, - true, - command, - Map("spark.mesos.executor.home" -> "test", - "spark.app.name" -> "test", - "spark.mesos.driver.secret.values" -> secretValues, - "spark.mesos.driver.secret.envkeys" -> envKeys), - "s1", - new Date()) - val response = scheduler.submitDriver(driverDesc) - assert(response.success) - val offer = Utils.createOffer("o1", "s1", mem, cpu) - scheduler.resourceOffers(driver, Collections.singletonList(offer)) - val launchedTasks = Utils.verifyTaskLaunched(driver, "o1") - assert(launchedTasks.head - .getCommand - .getEnvironment - .getVariablesCount == 3) // SPARK_SUBMIT_OPS and the secret - val variableOne = launchedTasks.head.getCommand.getEnvironment - .getVariablesList.asScala.filter(_.getName == "USER").head - assert(variableOne.getSecret.isInitialized) - assert(variableOne.getSecret.getType == Secret.Type.VALUE) - assert(variableOne.getSecret.getValue.getData == ByteString.copyFrom("user".getBytes)) - assert(variableOne.getType == Environment.Variable.Type.SECRET) - val variableTwo = launchedTasks.head.getCommand.getEnvironment - .getVariablesList.asScala.filter(_.getName == "PASSWORD").head - assert(variableTwo.getSecret.isInitialized) - assert(variableTwo.getSecret.getType == Secret.Type.VALUE) - assert(variableTwo.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes)) - assert(variableTwo.getType == Environment.Variable.Type.SECRET) + val launchedTasks = launchDriverTask( + Utils.configEnvBasedValueSecrets(config.driverSecretConfig)) + Utils.verifyEnvBasedValueSecrets(launchedTasks) } test("Creates file-based reference secrets.") { - setScheduler() - val mem = 1000 - val cpu = 1 - val secretName = "/path/to/secret,/anothersecret" - val secretPath = "/topsecret,/mypassword" - val driverDesc = new MesosDriverDescription( - "d1", - "jar", - mem, - cpu, - true, - command, - Map("spark.mesos.executor.home" -> "test", - "spark.app.name" -> "test", - "spark.mesos.driver.secret.names" -> secretName, - "spark.mesos.driver.secret.filenames" -> secretPath), - "s1", - new Date()) - val response = scheduler.submitDriver(driverDesc) - assert(response.success) - val offer = Utils.createOffer("o1", "s1", mem, cpu) - scheduler.resourceOffers(driver, Collections.singletonList(offer)) - val launchedTasks = Utils.verifyTaskLaunched(driver, "o1") - val volumes = launchedTasks.head.getContainer.getVolumesList - assert(volumes.size() == 2) - val secretVolOne = volumes.get(0) - assert(secretVolOne.getContainerPath == "/topsecret") - assert(secretVolOne.getSource.getSecret.getType == Secret.Type.REFERENCE) - assert(secretVolOne.getSource.getSecret.getReference.getName == "/path/to/secret") - val secretVolTwo = volumes.get(1) - assert(secretVolTwo.getContainerPath == "/mypassword") - assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.REFERENCE) - assert(secretVolTwo.getSource.getSecret.getReference.getName == "/anothersecret") + val launchedTasks = launchDriverTask( + Utils.configFileBasedRefSecrets(config.driverSecretConfig)) + Utils.verifyFileBasedRefSecrets(launchedTasks) } test("Creates a file-based value secrets.") { + val launchedTasks = launchDriverTask( + Utils.configFileBasedValueSecrets(config.driverSecretConfig)) + Utils.verifyFileBasedValueSecrets(launchedTasks) + } + + private def launchDriverTask(addlSparkConfVars: Map[String, String]): List[TaskInfo] = { setScheduler() val mem = 1000 val cpu = 1 - val secretValues = "user,password" - val secretPath = "/whoami,/mypassword" val driverDesc = new MesosDriverDescription( "d1", "jar", @@ -475,27 +376,14 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi true, command, Map("spark.mesos.executor.home" -> "test", - "spark.app.name" -> "test", - "spark.mesos.driver.secret.values" -> secretValues, - "spark.mesos.driver.secret.filenames" -> secretPath), + "spark.app.name" -> "test") ++ + addlSparkConfVars, "s1", new Date()) val response = scheduler.submitDriver(driverDesc) assert(response.success) val offer = Utils.createOffer("o1", "s1", mem, cpu) scheduler.resourceOffers(driver, Collections.singletonList(offer)) - val launchedTasks = Utils.verifyTaskLaunched(driver, "o1") - val volumes = launchedTasks.head.getContainer.getVolumesList - assert(volumes.size() == 2) - val secretVolOne = volumes.get(0) - assert(secretVolOne.getContainerPath == "/whoami") - assert(secretVolOne.getSource.getSecret.getType == Secret.Type.VALUE) - assert(secretVolOne.getSource.getSecret.getValue.getData == - ByteString.copyFrom("user".getBytes)) - val secretVolTwo = volumes.get(1) - assert(secretVolTwo.getContainerPath == "/mypassword") - assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.VALUE) - assert(secretVolTwo.getSource.getSecret.getValue.getData == - ByteString.copyFrom("password".getBytes)) + Utils.verifyTaskLaunched(driver, "o1") } } http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 6c40792..f4bd1ee 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.reflect.ClassTag import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos._ @@ -38,7 +37,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor} import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite @@ -653,6 +652,37 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite offerResourcesAndVerify(2, true) } + test("Creates an env-based reference secrets.") { + val launchedTasks = launchExecutorTasks(configEnvBasedRefSecrets(executorSecretConfig)) + verifyEnvBasedRefSecrets(launchedTasks) + } + + test("Creates an env-based value secrets.") { + val launchedTasks = launchExecutorTasks(configEnvBasedValueSecrets(executorSecretConfig)) + verifyEnvBasedValueSecrets(launchedTasks) + } + + test("Creates file-based reference secrets.") { + val launchedTasks = launchExecutorTasks(configFileBasedRefSecrets(executorSecretConfig)) + verifyFileBasedRefSecrets(launchedTasks) + } + + test("Creates a file-based value secrets.") { + val launchedTasks = launchExecutorTasks(configFileBasedValueSecrets(executorSecretConfig)) + verifyFileBasedValueSecrets(launchedTasks) + } + + private def launchExecutorTasks(sparkConfVars: Map[String, String]): List[TaskInfo] = { + setBackend(sparkConfVars) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + verifyTaskLaunched(driver, "o1") + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = { http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala index f49d7c2..442c439 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.mesos.config class MesosSchedulerBackendUtilSuite extends SparkFunSuite { @@ -26,7 +27,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite { conf.set("spark.mesos.executor.docker.parameters", "a,b") conf.set("spark.mesos.executor.docker.image", "test") - val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf) + val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo( + conf) val params = containerInfo.getDocker.getParametersList assert(params.size() == 0) @@ -37,7 +39,8 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite { conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3") conf.set("spark.mesos.executor.docker.image", "test") - val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf) + val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo( + conf) val params = containerInfo.getDocker.getParametersList assert(params.size() == 3) assert(params.get(0).getKey == "a") http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index 833db0c..5636ac5 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -24,9 +24,12 @@ import scala.collection.JavaConverters._ import org.apache.mesos.Protos._ import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} import org.apache.mesos.SchedulerDriver +import org.apache.mesos.protobuf.ByteString import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Mockito._ +import org.apache.spark.deploy.mesos.config.MesosSecretConfig + object Utils { val TEST_FRAMEWORK_ID = FrameworkID.newBuilder() @@ -105,4 +108,108 @@ object Utils { def createTaskId(taskId: String): TaskID = { TaskID.newBuilder().setValue(taskId).build() } + + def configEnvBasedRefSecrets(secretConfig: MesosSecretConfig): Map[String, String] = { + val secretName = "/path/to/secret,/anothersecret" + val envKey = "SECRET_ENV_KEY,PASSWORD" + Map( + secretConfig.SECRET_NAMES.key -> secretName, + secretConfig.SECRET_ENVKEYS.key -> envKey + ) + } + + def verifyEnvBasedRefSecrets(launchedTasks: List[TaskInfo]): Unit = { + val envVars = launchedTasks.head + .getCommand + .getEnvironment + .getVariablesList + .asScala + assert(envVars + .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head + assert(variableOne.getSecret.isInitialized) + assert(variableOne.getSecret.getType == Secret.Type.REFERENCE) + assert(variableOne.getSecret.getReference.getName == "/path/to/secret") + assert(variableOne.getType == Environment.Variable.Type.SECRET) + val variableTwo = envVars.filter(_.getName == "PASSWORD").head + assert(variableTwo.getSecret.isInitialized) + assert(variableTwo.getSecret.getType == Secret.Type.REFERENCE) + assert(variableTwo.getSecret.getReference.getName == "/anothersecret") + assert(variableTwo.getType == Environment.Variable.Type.SECRET) + } + + def configEnvBasedValueSecrets(secretConfig: MesosSecretConfig): Map[String, String] = { + val secretValues = "user,password" + val envKeys = "USER,PASSWORD" + Map( + secretConfig.SECRET_VALUES.key -> secretValues, + secretConfig.SECRET_ENVKEYS.key -> envKeys + ) + } + + def verifyEnvBasedValueSecrets(launchedTasks: List[TaskInfo]): Unit = { + val envVars = launchedTasks.head + .getCommand + .getEnvironment + .getVariablesList + .asScala + assert(envVars + .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + val variableOne = envVars.filter(_.getName == "USER").head + assert(variableOne.getSecret.isInitialized) + assert(variableOne.getSecret.getType == Secret.Type.VALUE) + assert(variableOne.getSecret.getValue.getData == ByteString.copyFrom("user".getBytes)) + assert(variableOne.getType == Environment.Variable.Type.SECRET) + val variableTwo = envVars.filter(_.getName == "PASSWORD").head + assert(variableTwo.getSecret.isInitialized) + assert(variableTwo.getSecret.getType == Secret.Type.VALUE) + assert(variableTwo.getSecret.getValue.getData == ByteString.copyFrom("password".getBytes)) + assert(variableTwo.getType == Environment.Variable.Type.SECRET) + } + + def configFileBasedRefSecrets(secretConfig: MesosSecretConfig): Map[String, String] = { + val secretName = "/path/to/secret,/anothersecret" + val secretPath = "/topsecret,/mypassword" + Map( + secretConfig.SECRET_NAMES.key -> secretName, + secretConfig.SECRET_FILENAMES.key -> secretPath + ) + } + + def verifyFileBasedRefSecrets(launchedTasks: List[TaskInfo]): Unit = { + val volumes = launchedTasks.head.getContainer.getVolumesList + assert(volumes.size() == 2) + val secretVolOne = volumes.get(0) + assert(secretVolOne.getContainerPath == "/topsecret") + assert(secretVolOne.getSource.getSecret.getType == Secret.Type.REFERENCE) + assert(secretVolOne.getSource.getSecret.getReference.getName == "/path/to/secret") + val secretVolTwo = volumes.get(1) + assert(secretVolTwo.getContainerPath == "/mypassword") + assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.REFERENCE) + assert(secretVolTwo.getSource.getSecret.getReference.getName == "/anothersecret") + } + + def configFileBasedValueSecrets(secretConfig: MesosSecretConfig): Map[String, String] = { + val secretValues = "user,password" + val secretPath = "/whoami,/mypassword" + Map( + secretConfig.SECRET_VALUES.key -> secretValues, + secretConfig.SECRET_FILENAMES.key -> secretPath + ) + } + + def verifyFileBasedValueSecrets(launchedTasks: List[TaskInfo]): Unit = { + val volumes = launchedTasks.head.getContainer.getVolumesList + assert(volumes.size() == 2) + val secretVolOne = volumes.get(0) + assert(secretVolOne.getContainerPath == "/whoami") + assert(secretVolOne.getSource.getSecret.getType == Secret.Type.VALUE) + assert(secretVolOne.getSource.getSecret.getValue.getData == + ByteString.copyFrom("user".getBytes)) + val secretVolTwo = volumes.get(1) + assert(secretVolTwo.getContainerPath == "/mypassword") + assert(secretVolTwo.getSource.getSecret.getType == Secret.Type.VALUE) + assert(secretVolTwo.getSource.getSecret.getValue.getData == + ByteString.copyFrom("password".getBytes)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org