Repository: spark
Updated Branches:
refs/heads/master da2dc6929 -> ba84bcb2c
[SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
## What changes were proposed in this pull request?
Introducing R Bindings for Spark R on K8s
- [x] Running SparkR Job
## How was this patch tested?
This patch was tested with
- [x] Unit Tests
- [x] Integration Tests
## Example:
Commands to run example spark job:
1. `dev/make-distribution.sh --pip --r --tgz -Psparkr -Phadoop-2.7 -Pkubernetes`
2. `bin/docker-image-tool.sh -m -t testing build`
3.
```
bin/spark-submit \
--master k8s://https://192.168.64.33:8443 \
--deploy-mode cluster \
--name spark-r \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=spark-r:testing \
local:///opt/spark/examples/src/main/r/dataframe.R
```
This above spark-submit command works given the distribution. (Will include
this integration test in PR once PRB is ready).
Author: Ilan Filonenko <[email protected]>
Closes #21584 from ifilonenko/spark-r.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba84bcb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba84bcb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba84bcb2
Branch: refs/heads/master
Commit: ba84bcb2c4f73baf63782ff6fad5a607008c7cd2
Parents: da2dc69
Author: Ilan Filonenko <[email protected]>
Authored: Fri Aug 17 16:04:02 2018 -0700
Committer: mcheah <[email protected]>
Committed: Fri Aug 17 16:04:02 2018 -0700
----------------------------------------------------------------------
bin/docker-image-tool.sh | 23 ++++---
.../org/apache/spark/deploy/SparkSubmit.scala | 8 ++-
.../org/apache/spark/deploy/k8s/Config.scala | 13 ++++
.../org/apache/spark/deploy/k8s/Constants.scala | 2 +
.../spark/deploy/k8s/KubernetesConf.scala | 8 ++-
.../features/bindings/RDriverFeatureStep.scala | 59 ++++++++++++++++++
.../submit/KubernetesClientApplication.scala | 2 +
.../k8s/submit/KubernetesDriverBuilder.scala | 22 ++++---
.../deploy/k8s/submit/MainAppResource.scala | 3 +
.../spark/deploy/k8s/KubernetesConfSuite.scala | 22 +++++++
.../bindings/RDriverFeatureStepSuite.scala | 63 ++++++++++++++++++++
.../submit/KubernetesDriverBuilderSuite.scala | 36 ++++++++++-
.../dockerfiles/spark/bindings/R/Dockerfile | 29 +++++++++
.../src/main/dockerfiles/spark/entrypoint.sh | 14 ++++-
.../integrationtest/ClientModeTestsSuite.scala | 2 +-
.../k8s/integrationtest/KubernetesSuite.scala | 21 ++++++-
.../k8s/integrationtest/RTestsSuite.scala | 44 ++++++++++++++
17 files changed, 344 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/bin/docker-image-tool.sh
----------------------------------------------------------------------
diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh
index cd22e75..d637105 100755
--- a/bin/docker-image-tool.sh
+++ b/bin/docker-image-tool.sh
@@ -71,6 +71,7 @@ function build {
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local
PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}
+ local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"}
docker build $NOCACHEARG "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
@@ -79,11 +80,16 @@ function build {
docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .
+
+ docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \
+ -t $(image_ref spark-r) \
+ -f "$RDOCKERFILE" .
}
function push {
docker push "$(image_ref spark)"
docker push "$(image_ref spark-py)"
+ docker push "$(image_ref spark-r)"
}
function usage {
@@ -97,12 +103,13 @@ Commands:
push Push a pre-built image to a registry. Requires a repository
address to be provided.
Options:
- -f file Dockerfile to build for JVM based Jobs. By default builds the
Dockerfile shipped with Spark.
- -p file Dockerfile with Python baked in. By default builds the
Dockerfile shipped with Spark.
- -r repo Repository address.
- -t tag Tag to apply to the built image, or to identify the image to be
pushed.
- -m Use minikube's Docker daemon.
- -n Build docker image with --no-cache
+ -f file Dockerfile to build for JVM based Jobs. By default
builds the Dockerfile shipped with Spark.
+ -p file Dockerfile to build for PySpark Jobs. Builds Python
dependencies and ships with Spark.
+ -R file Dockerfile to build for SparkR Jobs. Builds R
dependencies and ships with Spark.
+ -r repo Repository address.
+ -t tag Tag to apply to the built image, or to identify the
image to be pushed.
+ -m Use minikube's Docker daemon.
+ -n Build docker image with --no-cache
-b arg Build arg to build or push the image. For multiple build args,
this option needs to
be used separately for each build arg.
@@ -133,14 +140,16 @@ REPO=
TAG=
BASEDOCKERFILE=
PYDOCKERFILE=
+RDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
-while getopts f:p:mr:t:n:b: option
+while getopts f:p:R:mr:t:n:b: option
do
case "${option}"
in
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
+ R) RDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
n) NOCACHEARG="--no-cache";;
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6e70bcd..cf902db 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -286,8 +286,6 @@ private[spark] class SparkSubmit extends Logging {
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
- case (KUBERNETES, _) if args.isR =>
- error("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
error("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
@@ -700,7 +698,11 @@ private[spark] class SparkSubmit extends Logging {
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
- } else {
+ } else if (args.isR) {
+ childArgs ++= Array("--primary-r-file", args.primaryResource)
+ childArgs ++= Array("--main-class",
"org.apache.spark.deploy.RRunner")
+ }
+ else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 4442333..1b582fe 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -139,6 +139,19 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional
+ val KUBERNETES_R_MAIN_APP_RESOURCE =
+ ConfigBuilder("spark.kubernetes.r.mainAppResource")
+ .doc("The main app resource for SparkR jobs")
+ .internal()
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_R_APP_ARGS =
+ ConfigBuilder("spark.kubernetes.r.appArgs")
+ .doc("The app arguments for SparkR Jobs")
+ .internal()
+ .stringConf
+ .createOptional
val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index f82cd7f..8202d87 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -71,6 +71,8 @@ private[spark] object Constants {
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
+ val ENV_R_PRIMARY = "R_PRIMARY"
+ val ENV_R_ARGS = "R_APP_ARGS"
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 866ba3c..3aa35d4 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -78,6 +78,9 @@ private[spark] case class KubernetesConf[T <:
KubernetesRoleSpecificConf](
def pySparkPythonVersion(): String = sparkConf
.get(PYSPARK_MAJOR_PYTHON_VERSION)
+ def sparkRMainResource(): Option[String] = sparkConf
+ .get(KUBERNETES_R_MAIN_APP_RESOURCE)
+
def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
def imagePullSecrets(): Seq[LocalObjectReference] = {
@@ -125,7 +128,7 @@ private[spark] object KubernetesConf {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
// The function of this outer match is to account for multiple nonJVM
- // bindings that will all have increased MEMORY_OVERHEAD_FACTOR to 0.4
+ // bindings that will all have increased default
MEMORY_OVERHEAD_FACTOR to 0.4
case nonJVM: NonJVMResource =>
nonJVM match {
case PythonMainAppResource(res) =>
@@ -133,6 +136,9 @@ private[spark] object KubernetesConf {
maybePyFiles.foreach{maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))}
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
+ case RMainAppResource(res) =>
+ additionalFiles += res
+ sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res)
}
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
new file mode 100644
index 0000000..b33b86e
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder,
HasMetadata}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf,
KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
+
+private[spark] class RDriverFeatureStep(
+ kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf])
+ extends KubernetesFeatureConfigStep {
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val roleConf = kubernetesConf.roleSpecificConf
+ require(roleConf.mainAppResource.isDefined, "R Main Resource must be
defined")
+ val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map(
+ rArgs =>
+ new EnvVarBuilder()
+ .withName(ENV_R_ARGS)
+ .withValue(rArgs.mkString(","))
+ .build())
+ val envSeq =
+ Seq(new EnvVarBuilder()
+ .withName(ENV_R_PRIMARY)
+
.withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get))
+ .build())
+ val rEnvs = envSeq ++
+ maybeRArgs.toSeq
+
+ val withRPrimaryContainer = new ContainerBuilder(pod.container)
+ .addAllToEnv(rEnvs.asJava)
+ .addToArgs("driver-r")
+ .addToArgs("--properties-file", SPARK_CONF_PATH)
+ .addToArgs("--class", roleConf.mainClass)
+ .build()
+
+ SparkPod(pod.pod, withRPrimaryContainer)
+ }
+ override def getAdditionalPodSystemProperties(): Map[String, String] =
Map.empty
+
+ override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 9398fae..986c950 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -60,6 +60,8 @@ private[spark] object ClientArguments {
mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
case Array("--primary-py-file", primaryPythonResource: String) =>
mainAppResource = Some(PythonMainAppResource(primaryPythonResource))
+ case Array("--primary-r-file", primaryRFile: String) =>
+ mainAppResource = Some(RMainAppResource(primaryRFile))
case Array("--other-py-files", pyFiles: String) =>
maybePyFiles = Some(pyFiles)
case Array("--main-class", clazz: String) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index 7208e3d..8f3f18f 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -17,8 +17,8 @@
package org.apache.spark.deploy.k8s.submit
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec,
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
-import org.apache.spark.deploy.k8s.features._
-import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep,
PythonDriverFeatureStep}
+import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep,
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep,
EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep,
MountVolumesFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep,
PythonDriverFeatureStep, RDriverFeatureStep}
private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) =>
BasicDriverFeatureStep =
@@ -40,14 +40,18 @@ private[spark] class KubernetesDriverBuilder(
provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountVolumesFeatureStep) =
new MountVolumesFeatureStep(_),
- provideJavaStep: (
- KubernetesConf[KubernetesDriverSpecificConf]
- => JavaDriverFeatureStep) =
- new JavaDriverFeatureStep(_),
providePythonStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> PythonDriverFeatureStep) =
- new PythonDriverFeatureStep(_)) {
+ new PythonDriverFeatureStep(_),
+ provideRStep: (
+ KubernetesConf[KubernetesDriverSpecificConf]
+ => RDriverFeatureStep) =
+ new RDriverFeatureStep(_),
+ provideJavaStep: (
+ KubernetesConf[KubernetesDriverSpecificConf]
+ => JavaDriverFeatureStep) =
+ new JavaDriverFeatureStep(_)) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]):
KubernetesDriverSpec = {
@@ -71,7 +75,9 @@ private[spark] class KubernetesDriverBuilder(
case JavaMainAppResource(_) =>
provideJavaStep(kubernetesConf)
case PythonMainAppResource(_) =>
- providePythonStep(kubernetesConf)}
+ providePythonStep(kubernetesConf)
+ case RMainAppResource(_) =>
+ provideRStep(kubernetesConf)}
.getOrElse(provideJavaStep(kubernetesConf))
val allFeatures = (baseFeatures :+ bindingsStep) ++
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
index cbe081a..dd5a454 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala
@@ -24,3 +24,6 @@ private[spark] case class
JavaMainAppResource(primaryResource: String) extends M
private[spark] case class PythonMainAppResource(primaryResource: String)
extends MainAppResource with NonJVMResource
+
+private[spark] case class RMainAppResource(primaryResource: String)
+ extends MainAppResource with NonJVMResource
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index ecdb713..e3c19cd 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -122,6 +122,28 @@ class KubernetesConfSuite extends SparkFunSuite {
=== Array("local:///opt/spark/example4.py", mainResourceFile) ++
inputPyFiles)
}
+ test("Creating driver conf with a r primary file") {
+ val mainResourceFile = "local:///opt/spark/main.R"
+ val sparkConf = new SparkConf(false)
+ .setJars(Seq("local:///opt/spark/jar1.jar"))
+ .set("spark.files", "local:///opt/spark/example2.R")
+ val mainAppResource = Some(RMainAppResource(mainResourceFile))
+ val kubernetesConfWithMainResource = KubernetesConf.createDriverConf(
+ sparkConf,
+ APP_NAME,
+ RESOURCE_NAME_PREFIX,
+ APP_ID,
+ mainAppResource,
+ MAIN_CLASS,
+ APP_ARGS,
+ maybePyFiles = None)
+
assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",")
+ === Array("local:///opt/spark/jar1.jar"))
+
assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) ===
0.4)
+ assert(kubernetesConfWithMainResource.sparkFiles
+ === Array("local:///opt/spark/example2.R", mainResourceFile))
+ }
+
test("Testing explicit setting of memory overhead on non-JVM tasks") {
val sparkConf = new SparkConf(false)
.set(MEMORY_OVERHEAD_FACTOR, 0.3)
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
new file mode 100644
index 0000000..8fdf91e
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features.bindings
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesConf,
KubernetesDriverSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.RMainAppResource
+
+class RDriverFeatureStepSuite extends SparkFunSuite {
+
+ test("R Step modifies container correctly") {
+ val expectedMainResource = "/main.R"
+ val mainResource = "local:///main.R"
+ val baseDriverPod = SparkPod.initialPod()
+ val sparkConf = new SparkConf(false)
+ .set(KUBERNETES_R_MAIN_APP_RESOURCE, mainResource)
+ val kubernetesConf = KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ Some(RMainAppResource(mainResource)),
+ "test-app",
+ "r-runner",
+ Seq("5 7")),
+ appResourceNamePrefix = "",
+ appId = "",
+ roleLabels = Map.empty,
+ roleAnnotations = Map.empty,
+ roleSecretNamesToMountPaths = Map.empty,
+ roleSecretEnvNamesToKeyRefs = Map.empty,
+ roleEnvs = Map.empty,
+ roleVolumes = Seq.empty,
+ sparkFiles = Seq.empty[String])
+
+ val step = new RDriverFeatureStep(kubernetesConf)
+ val driverContainerwithR = step.configurePod(baseDriverPod).container
+ assert(driverContainerwithR.getEnv.size === 2)
+ val envs = driverContainerwithR
+ .getEnv
+ .asScala
+ .map(env => (env.getName, env.getValue))
+ .toMap
+ assert(envs(ENV_R_PRIMARY) === expectedMainResource)
+ assert(envs(ENV_R_ARGS) === "5 7")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 046e578..4117c54 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -20,7 +20,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep,
DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep,
EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep,
MountSecretsFeatureStep}
-import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep,
PythonDriverFeatureStep}
+import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep,
PythonDriverFeatureStep, RDriverFeatureStep}
class KubernetesDriverBuilderSuite extends SparkFunSuite {
@@ -31,6 +31,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val SECRETS_STEP_TYPE = "mount-secrets"
private val JAVA_STEP_TYPE = "java-bindings"
private val PYSPARK_STEP_TYPE = "pyspark-bindings"
+ private val R_STEP_TYPE = "r-bindings"
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
@@ -55,6 +56,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val pythonStep =
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep])
+ private val rStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ R_STEP_TYPE, classOf[RDriverFeatureStep])
+
private val envSecretsStep =
KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep])
@@ -70,8 +74,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
_ => envSecretsStep,
_ => localDirsStep,
_ => mountVolumesStep,
- _ => javaStep,
- _ => pythonStep)
+ _ => pythonStep,
+ _ => rStep,
+ _ => javaStep)
test("Apply fundamental steps all the time.") {
val conf = KubernetesConf(
@@ -211,6 +216,31 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
JAVA_STEP_TYPE)
}
+ test("Apply R step if main resource is R.") {
+ val conf = KubernetesConf(
+ new SparkConf(false),
+ KubernetesDriverSpecificConf(
+ Some(RMainAppResource("example.R")),
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String])
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
+ SERVICE_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ R_STEP_TYPE)
+ }
private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec,
stepTypes: String*)
: Unit = {
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
new file mode 100644
index 0000000..e627883
--- /dev/null
+++
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/R/Dockerfile
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ARG base_img
+FROM $base_img
+WORKDIR /
+RUN mkdir ${SPARK_HOME}/R
+COPY R ${SPARK_HOME}/R
+
+RUN apk add --no-cache R R-dev
+
+ENV R_HOME /usr/lib/R
+
+WORKDIR /opt/spark/work-dir
+ENTRYPOINT [ "/opt/entrypoint.sh" ]
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 8bdb0f7..216e8fe 100755
---
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -38,7 +38,7 @@ fi
SPARK_K8S_CMD="$1"
case "$SPARK_K8S_CMD" in
- driver | driver-py | executor)
+ driver | driver-py | driver-r | executor)
shift 1
;;
"")
@@ -66,6 +66,10 @@ if [ -n "$PYSPARK_APP_ARGS" ]; then
PYSPARK_ARGS="$PYSPARK_APP_ARGS"
fi
+R_ARGS=""
+if [ -n "$R_APP_ARGS" ]; then
+ R_ARGS="$R_APP_ARGS"
+fi
if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then
pyv="$(python -V 2>&1)"
@@ -96,6 +100,14 @@ case "$SPARK_K8S_CMD" in
"$@" $PYSPARK_PRIMARY $PYSPARK_ARGS
)
;;
+ driver-r)
+ CMD=(
+ "$SPARK_HOME/bin/spark-submit"
+ --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
+ --deploy-mode client
+ "$@" $R_PRIMARY $R_ARGS
+ )
+ ;;
executor)
CMD=(
${JAVA_HOME}/bin/java
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
index 159cfd9..c8bd584 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.{k8sTestTag,
INTERVAL, TIMEOUT}
-trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
+private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
test("Run in client mode.", k8sTestTag) {
val labels = Map("spark-app-selector" -> driverPodName)
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 13ce2ef..896a83a 100644
---
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -38,10 +38,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite
import KubernetesSuite._
- protected var testBackend: IntegrationTestBackend = _
- protected var sparkHomeDir: Path = _
+ private var sparkHomeDir: Path = _
+ private var pyImage: String = _
+ private var rImage: String = _
+
protected var image: String = _
- protected var pyImage: String = _
+ protected var testBackend: IntegrationTestBackend = _
protected var driverPodName: String = _
protected var kubernetesTestComponents: KubernetesTestComponents = _
protected var sparkAppConf: SparkAppConf = _
@@ -67,6 +69,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
val imageRepo = getTestImageRepo
image = s"$imageRepo/spark:$imageTag"
pyImage = s"$imageRepo/spark-py:$imageTag"
+ rImage = s"$imageRepo/spark-r:$imageTag"
val sparkDistroExamplesJarFile: File =
sparkHomeDir.resolve(Paths.get("examples", "jars"))
.toFile
@@ -239,6 +242,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite
assert(driverPod.getSpec.getContainers.get(0).getName ===
"spark-kubernetes-driver")
}
+ protected def doBasicDriverRPodCheck(driverPod: Pod): Unit = {
+ assert(driverPod.getMetadata.getName === driverPodName)
+ assert(driverPod.getSpec.getContainers.get(0).getImage === rImage)
+ assert(driverPod.getSpec.getContainers.get(0).getName ===
"spark-kubernetes-driver")
+ }
+
+
protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
@@ -249,6 +259,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}
+ protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = {
+ assert(executorPod.getSpec.getContainers.get(0).getImage === rImage)
+ assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+ }
+
protected def checkCustomSettings(pod: Pod): Unit = {
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
new file mode 100644
index 0000000..885a23c
--- /dev/null
+++
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import
org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo,
getTestImageTag}
+
+private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite =>
+
+ import RTestsSuite._
+ import KubernetesSuite.k8sTestTag
+
+ test("Run SparkR on simple dataframe.R example", k8sTestTag) {
+ sparkAppConf
+ .set("spark.kubernetes.container.image",
s"${getTestImageRepo}/spark-r:${getTestImageTag}")
+ runSparkApplicationAndVerifyCompletion(
+ appResource = SPARK_R_DATAFRAME_TEST,
+ mainClass = "",
+ expectedLogOnCompletion = Seq("name: string (nullable = true)", "1
Justin"),
+ appArgs = Array.empty[String],
+ driverPodChecker = doBasicDriverRPodCheck,
+ executorPodChecker = doBasicExecutorRPodCheck,
+ appLocator = appLocator,
+ isJVM = false)
+ }
+}
+
+private[spark] object RTestsSuite {
+ val CONTAINER_LOCAL_SPARKR: String =
"local:///opt/spark/examples/src/main/r/"
+ val SPARK_R_DATAFRAME_TEST: String = CONTAINER_LOCAL_SPARKR + "dataframe.R"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]