This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 75b40a5 [SPARK-27575][CORE][YARN] Yarn file-related confs should
merge new value with existing value
75b40a5 is described below
commit 75b40a53d3fb49ccadf032dd8e75c554c67676d2
Author: Jungtaek Lim (HeartSaVioR) <[email protected]>
AuthorDate: Mon Apr 29 10:14:59 2019 -0700
[SPARK-27575][CORE][YARN] Yarn file-related confs should merge new value
with existing value
## What changes were proposed in this pull request?
This patch fixes a bug which YARN file-related configurations are being
overwritten when there're some values to assign - e.g. if `--file` is specified
as an argument, `spark.yarn.dist.files` is overwritten with the value of
argument. After this patch the existing value and new value will be merged
before assigning to the value of configuration.
## How was this patch tested?
Added UT, and manually tested with below command:
> ./bin/spark-submit --verbose --files
/etc/spark2/conf/spark-defaults.conf.template --master yarn-cluster --class
org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.4.0.jar 10
where the spark conf file has
`spark.yarn.dist.files=file:/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties`
```
Spark config:
...
(spark.yarn.dist.files,file:/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties,file:///etc/spark2/conf/spark-defaults.conf.template)
...
```
Closes #24465 from HeartSaVioR/SPARK-27575.
Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
---
.../org/apache/spark/deploy/SparkSubmit.scala | 23 +++++--
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 80 +++++++++++++++++++++-
2 files changed, 95 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 9efaaa7..49d9395 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -544,10 +544,14 @@ private[spark] class SparkSubmit extends Logging {
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.queue"),
- OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.pyFiles"),
- OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.jars"),
- OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.files"),
- OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.archives"),
+ OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.pyFiles",
+ mergeFn = Some(mergeFileLists(_, _))),
+ OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.jars",
+ mergeFn = Some(mergeFileLists(_, _))),
+ OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.files",
+ mergeFn = Some(mergeFileLists(_, _))),
+ OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey =
"spark.yarn.dist.archives",
+ mergeFn = Some(mergeFileLists(_, _))),
// Other options
OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
@@ -608,7 +612,13 @@ private[spark] class SparkSubmit extends Logging {
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
- if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
+ if (opt.confKey != null) {
+ if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
+ sparkConf.set(opt.confKey,
opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
+ } else {
+ sparkConf.set(opt.confKey, opt.value)
+ }
+ }
}
}
@@ -1381,7 +1391,8 @@ private case class OptionAssigner(
clusterManager: Int,
deployMode: Int,
clOption: String = null,
- confKey: String = null)
+ confKey: String = null,
+ mergeFn: Option[(String, String) => String] = None)
private[spark] trait SparkSubmitOperation {
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 2a17245..ef6213e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -18,11 +18,10 @@
package org.apache.spark.deploy
import java.io._
-import java.net.URI
+import java.net.{URI, URL}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
-import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
@@ -897,6 +896,83 @@ class SparkSubmitSuite
}
}
+ test("SPARK-27575: yarn confs should merge new value with existing value") {
+ val tmpJarDir = Utils.createTempDir()
+ val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"),
tmpJarDir)
+ val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"),
tmpJarDir)
+
+ val tmpJarDirYarnOpt = Utils.createTempDir()
+ val jar1YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" ->
"2"), tmpJarDirYarnOpt)
+ val jar2YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" ->
"USER2"),
+ tmpJarDirYarnOpt)
+
+ val tmpFileDir = Utils.createTempDir()
+ val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
+ val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)
+
+ val tmpFileDirYarnOpt = Utils.createTempDir()
+ val file1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py",
tmpFileDirYarnOpt)
+ val file2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg",
tmpFileDirYarnOpt)
+
+ val tmpPyFileDir = Utils.createTempDir()
+ val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
+ val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)
+
+ val tmpPyFileDirYarnOpt = Utils.createTempDir()
+ val pyFile1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py",
tmpPyFileDirYarnOpt)
+ val pyFile2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg",
tmpPyFileDirYarnOpt)
+
+ val tmpArchiveDir = Utils.createTempDir()
+ val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
+ val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
+
+ val tmpArchiveDirYarnOpt = Utils.createTempDir()
+ val archive1YarnOpt = File.createTempFile("archive1YarnOpt", ".zip",
tmpArchiveDirYarnOpt)
+ val archive2YarnOpt = File.createTempFile("archive2YarnOpt", ".zip",
tmpArchiveDirYarnOpt)
+
+ val tempPyFile = File.createTempFile("tmpApp", ".py")
+ tempPyFile.deleteOnExit()
+
+ val args = Seq(
+ "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+ "--name", "testApp",
+ "--master", "yarn",
+ "--deploy-mode", "client",
+ "--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
+ "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
+ "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
+ "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
+ "--conf", "spark.yarn.dist.files=" +
+ s"${Seq(file1YarnOpt,
file2YarnOpt).map(_.toURI.toString).mkString(",")}",
+ "--conf", "spark.yarn.dist.pyFiles=" +
+ s"${Seq(pyFile1YarnOpt,
pyFile2YarnOpt).map(_.toURI.toString).mkString(",")}",
+ "--conf", "spark.yarn.dist.jars=" +
+ s"${Seq(jar1YarnOpt,
jar2YarnOpt).map(_.toURI.toString).mkString(",")}",
+ "--conf", "spark.yarn.dist.archives=" +
+ s"${Seq(archive1YarnOpt,
archive2YarnOpt).map(_.toURI.toString).mkString(",")}",
+ tempPyFile.toURI().toString())
+
+ def assertEqualsWithURLs(expected: Set[URL], confValue: String): Unit = {
+ val confValPaths = confValue.split(",").map(new Path(_)).toSet
+ assert(expected.map(u => new Path(u.toURI)) === confValPaths)
+ }
+
+ def assertEqualsWithFiles(expected: Set[File], confValue: String): Unit = {
+ assertEqualsWithURLs(expected.map(_.toURI.toURL), confValue)
+ }
+
+ val appArgs = new SparkSubmitArguments(args)
+ val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+ assertEqualsWithURLs(
+ Set(jar1, jar2, jar1YarnOpt, jar2YarnOpt),
conf.get("spark.yarn.dist.jars"))
+ assertEqualsWithFiles(
+ Set(file1, file2, file1YarnOpt, file2YarnOpt),
conf.get("spark.yarn.dist.files"))
+ assertEqualsWithFiles(
+ Set(pyFile1, pyFile2, pyFile1YarnOpt, pyFile2YarnOpt),
conf.get("spark.yarn.dist.pyFiles"))
+ assertEqualsWithFiles(Set(archive1, archive2, archive1YarnOpt,
archive2YarnOpt),
+ conf.get("spark.yarn.dist.archives"))
+ }
+
// scalastyle:on println
private def checkDownloadedFile(sourcePath: String, outputPath: String):
Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]