Repository: spark Updated Branches: refs/heads/master 7f16c6910 -> da8c59bde
[SPARK-12559][SPARK SUBMIT] fix --packages for stand-alone cluster mode Fixes --packages flag for the stand-alone case in cluster mode. Adds to the driver classpath the jars that are resolved via ivy along with any other jars passed to `spark.jars`. Jars not resolved by ivy are downloaded explicitly to a tmp folder on the driver node. Similar code is available in SparkSubmit so we refactored part of it to use it at the DriverWrapper class which is responsible for launching driver in standalone cluster mode. Note: In stand-alone mode `spark.jars` contains the user jar so it can be fetched later on at the executor side. Manually by submitting a driver in cluster mode within a standalone cluster and checking if dependencies were resolved at the driver side. Author: Stavros Kontopoulos <st.kontopou...@gmail.com> Closes #18630 from skonto/fix_packages_stand_alone_cluster. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da8c59bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da8c59bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da8c59bd Branch: refs/heads/master Commit: da8c59bdeabd9ac9eace9b95eda8b8edecc8937e Parents: 7f16c69 Author: Stavros Kontopoulos <st.kontopou...@gmail.com> Authored: Fri Aug 11 15:49:58 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Fri Aug 11 15:52:32 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/deploy/DependencyUtils.scala | 99 ++++++++++++++++++++ .../org/apache/spark/deploy/SparkSubmit.scala | 52 +++------- .../spark/deploy/worker/DriverWrapper.scala | 23 +++++ 3 files changed, 137 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/da8c59bd/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala new file mode 100644 index 0000000..97f3803 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.File +import java.nio.file.Files + +import scala.collection.mutable.HashMap + +import org.apache.commons.io.FileUtils +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.util.MutableURLClassLoader + +private[deploy] object DependencyUtils { + + def resolveMavenDependencies( + packagesExclusions: String, + packages: String, + repositories: String, + ivyRepoPath: String): String = { + val exclusions: Seq[String] = + if (!StringUtils.isBlank(packagesExclusions)) { + packagesExclusions.split(",") + } else { + Nil + } + // Create the IvySettings, either load from file or build defaults + val ivySettings = sys.props.get("spark.jars.ivySettings").map { ivySettingsFile => + SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories), Option(ivyRepoPath)) + }.getOrElse { + SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath)) + } + + SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions) + } + + def createTempDir(): File = { + val targetDir = Files.createTempDirectory("tmp").toFile + // scalastyle:off runtimeaddshutdownhook + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run(): Unit = { + FileUtils.deleteQuietly(targetDir) + } + }) + // scalastyle:on runtimeaddshutdownhook + targetDir + } + + def resolveAndDownloadJars(jars: String, userJar: String): String = { + val targetDir = DependencyUtils.createTempDir() + val hadoopConf = new Configuration() + val sparkProperties = new HashMap[String, String]() + val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore", + "spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword", + "spark.ssl.fs.protocol", "spark.ssl.protocol") + + securityProperties.foreach { pName => + sys.props.get(pName).foreach { pValue => + sparkProperties.put(pName, pValue) + } + } + + Option(jars) + .map { + SparkSubmit.resolveGlobPaths(_, hadoopConf) + .split(",") + .filterNot(_.contains(userJar.split("/").last)) + .mkString(",") + } + .filterNot(_ == "") + .map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf)) + .orNull + } + + def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = { + if (jars != null) { + for (jar <- jars.split(",")) { + SparkSubmit.addJarToClasspath(jar, loader) + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/da8c59bd/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0ea1436..0197800 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy import java.io._ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL -import java.nio.file.Files import java.security.{KeyStore, PrivilegedExceptionAction} import java.security.cert.X509Certificate import java.text.ParseException @@ -31,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.Properties import com.google.common.io.ByteStreams -import org.apache.commons.io.FileUtils import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} @@ -300,28 +298,13 @@ object SparkSubmit extends CommandLineUtils { } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER + val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER - if (!isMesosCluster) { + if (!isMesosCluster && !isStandAloneCluster) { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code - val exclusions: Seq[String] = - if (!StringUtils.isBlank(args.packagesExclusions)) { - args.packagesExclusions.split(",") - } else { - Nil - } - - // Create the IvySettings, either load from file or build defaults - val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map { ivySettingsFile => - SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(args.repositories), - Option(args.ivyRepoPath)) - }.getOrElse { - SparkSubmitUtils.buildIvySettings(Option(args.repositories), Option(args.ivyRepoPath)) - } - - val resolvedMavenCoordinates = SparkSubmitUtils.resolveMavenCoordinates(args.packages, - ivySettings, exclusions = exclusions) - + val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( + args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) @@ -338,14 +321,7 @@ object SparkSubmit extends CommandLineUtils { } val hadoopConf = new HadoopConfiguration() - val targetDir = Files.createTempDirectory("tmp").toFile - // scalastyle:off runtimeaddshutdownhook - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { - FileUtils.deleteQuietly(targetDir) - } - }) - // scalastyle:on runtimeaddshutdownhook + val targetDir = DependencyUtils.createTempDir() // Resolve glob path for different resources. args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull @@ -473,11 +449,13 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.driver.extraLibraryPath"), - // Mesos only - propagate attributes for dependency resolution at the driver side - OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = "spark.jars.packages"), - OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = "spark.jars.repositories"), - OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = "spark.jars.ivy"), - OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = "spark.jars.excludes"), + // Propagate attributes for dependency resolution at the driver side + OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.packages"), + OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER, + sysProp = "spark.jars.repositories"), + OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = "spark.jars.ivy"), + OptionAssigner(args.packagesExclusions, STANDALONE | MESOS, + CLUSTER, sysProp = "spark.jars.excludes"), // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), @@ -780,7 +758,7 @@ object SparkSubmit extends CommandLineUtils { } } - private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) { + private[deploy] def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) { val uri = Utils.resolveURI(localJar) uri.getScheme match { case "file" | "local" => @@ -845,7 +823,7 @@ object SparkSubmit extends CommandLineUtils { * Merge a sequence of comma-separated file lists, some of which may be null to indicate * no files, into a single comma-separated string. */ - private def mergeFileLists(lists: String*): String = { + private[deploy] def mergeFileLists(lists: String*): String = { val merged = lists.filterNot(StringUtils.isBlank) .flatMap(_.split(",")) .mkString(",") @@ -968,7 +946,7 @@ object SparkSubmit extends CommandLineUtils { } } - private def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = { + private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = { require(paths != null, "paths cannot be null.") paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path => val uri = Utils.resolveURI(path) http://git-wip-us.apache.org/repos/asf/spark/blob/da8c59bd/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 6799f78..cd3e361 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -19,7 +19,10 @@ package org.apache.spark.deploy.worker import java.io.File +import org.apache.commons.lang3.StringUtils + import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.{DependencyUtils, SparkSubmit} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -51,6 +54,7 @@ object DriverWrapper { new MutableURLClassLoader(Array(userJarUrl), currentLoader) } Thread.currentThread.setContextClassLoader(loader) + setupDependencies(loader, userJar) // Delegate to supplied main class val clazz = Utils.classForName(mainClass) @@ -66,4 +70,23 @@ object DriverWrapper { System.exit(-1) } } + + private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = { + val Seq(packagesExclusions, packages, repositories, ivyRepoPath) = + Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy") + .map(sys.props.get(_).orNull) + + val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions, + packages, repositories, ivyRepoPath) + val jars = { + val jarsProp = sys.props.get("spark.jars").orNull + if (!StringUtils.isBlank(resolvedMavenCoordinates)) { + SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates) + } else { + jarsProp + } + } + val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar) + DependencyUtils.addJarsToClassPath(localJars, loader) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org