Repository: spark Updated Branches: refs/heads/master 5389013ac -> 4df8df5c2
[SPARK-15782][YARN] Set spark.jars system property in client mode ## What changes were proposed in this pull request? When `--packages` is specified with `spark-shell` the classes from those packages cannot be found, which I think is due to some of the changes in `SPARK-12343`. In particular `SPARK-12343` removes a line that sets the `spark.jars` system property in client mode, which is used by the repl main class to set the classpath. ## How was this patch tested? Tested manually. This system property is used by the repl to populate its classpath. If this is not set properly the classes for external packages cannot be found. tgravescs vanzin as you may be familiar with this part of the code. Author: Nezih Yigitbasi <[email protected]> Closes #13527 from nezihyigitbasi/repl-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4df8df5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4df8df5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4df8df5c Branch: refs/heads/master Commit: 4df8df5c2e68f5a5d231c401b04d762d7a648159 Parents: 5389013 Author: Nezih Yigitbasi <[email protected]> Authored: Wed Jun 15 14:07:36 2016 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Wed Jun 15 14:07:36 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 20 ++++++++++++++++++++ .../apache/spark/deploy/SparkSubmitSuite.scala | 12 ++++++++++++ .../org/apache/spark/repl/SparkILoop.scala | 11 +++++++++-- .../main/scala/org/apache/spark/repl/Main.scala | 4 +--- 5 files changed, 43 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4df8df5c/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d56946e..d870181 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -391,7 +391,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) - _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten + _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten http://git-wip-us.apache.org/repos/asf/spark/blob/4df8df5c/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f9d0540..aebd98b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2352,6 +2352,26 @@ private[spark] object Utils extends Logging { log.info(s"Started daemon with process name: ${Utils.getProcessName()}") SignalUtils.registerLogger(log) } + + /** + * Unions two comma-separated lists of files and filters out empty strings. + */ + def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = { + var allFiles = Set[String]() + leftList.foreach { value => allFiles ++= value.split(",") } + rightList.foreach { value => allFiles ++= value.split(",") } + allFiles.filter { _.nonEmpty } + } + + def getUserJars(conf: SparkConf): Seq[String] = { + val sparkJars = conf.getOption("spark.jars") + if (conf.get("spark.master") == "yarn") { + val yarnJars = conf.getOption("spark.yarn.dist.jars") + unionFileLists(sparkJars, yarnJars).toSeq + } else { + sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten + } + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/4df8df5c/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 2718976..0b02059 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -570,6 +570,18 @@ class SparkSubmitSuite appArgs.executorMemory should be ("2.3g") } } + + test("comma separated list of files are unioned correctly") { + val left = Option("/tmp/a.jar,/tmp/b.jar") + val right = Option("/tmp/c.jar,/tmp/a.jar") + val emptyString = Option("") + Utils.unionFileLists(left, right) should be (Set("/tmp/a.jar", "/tmp/b.jar", "/tmp/c.jar")) + Utils.unionFileLists(emptyString, emptyString) should be (Set.empty) + Utils.unionFileLists(Option("/tmp/a.jar"), emptyString) should be (Set("/tmp/a.jar")) + Utils.unionFileLists(emptyString, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar")) + Utils.unionFileLists(None, Option("/tmp/a.jar")) should be (Set("/tmp/a.jar")) + Utils.unionFileLists(Option("/tmp/a.jar"), None) should be (Set("/tmp/a.jar")) + } // scalastyle:on println // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. http://git-wip-us.apache.org/repos/asf/spark/blob/4df8df5c/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index b1e95d8..66de5e4 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1067,12 +1067,19 @@ object SparkILoop extends Logging { private def echo(msg: String) = Console println msg def getAddedJars: Array[String] = { + val conf = new SparkConf().setMaster(getMaster()) val envJars = sys.env.get("ADD_JARS") if (envJars.isDefined) { logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead") } - val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) } - val jars = propJars.orElse(envJars).getOrElse("") + val jars = { + val userJars = Utils.getUserJars(conf) + if (userJars.isEmpty) { + envJars.getOrElse("") + } else { + userJars.mkString(",") + } + } Utils.resolveURIs(jars).split(",").filter(_.nonEmpty) } http://git-wip-us.apache.org/repos/asf/spark/blob/4df8df5c/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 771670f..28fe84d 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -54,9 +54,7 @@ object Main extends Logging { // Visible for testing private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { interp = _interp - val jars = conf.getOption("spark.jars") - .map(_.replace(",", File.pathSeparator)) - .getOrElse("") + val jars = Utils.getUserJars(conf).mkString(File.pathSeparator) val interpArguments = List( "-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
