Repository: spark
Updated Branches:
  refs/heads/master 7f16c6910 -> da8c59bde


[SPARK-12559][SPARK SUBMIT] fix --packages for stand-alone cluster mode

Fixes --packages flag for the stand-alone case in cluster mode. Adds to the 
driver classpath the jars that are resolved via ivy along with any other jars 
passed to `spark.jars`. Jars not resolved by ivy are downloaded explicitly to a 
tmp folder on the driver node. Similar code is available in SparkSubmit so we 
refactored part of it to use it at the DriverWrapper class which is responsible 
for launching driver in standalone cluster mode.

Note: In stand-alone mode `spark.jars` contains the user jar so it can be 
fetched later on at the executor side.

Manually by submitting a driver in cluster mode within a standalone cluster and 
checking if dependencies were resolved at the driver side.

Author: Stavros Kontopoulos <st.kontopou...@gmail.com>

Closes #18630 from skonto/fix_packages_stand_alone_cluster.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da8c59bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da8c59bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da8c59bd

Branch: refs/heads/master
Commit: da8c59bdeabd9ac9eace9b95eda8b8edecc8937e
Parents: 7f16c69
Author: Stavros Kontopoulos <st.kontopou...@gmail.com>
Authored: Fri Aug 11 15:49:58 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Fri Aug 11 15:52:32 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/deploy/DependencyUtils.scala   | 99 ++++++++++++++++++++
 .../org/apache/spark/deploy/SparkSubmit.scala   | 52 +++-------
 .../spark/deploy/worker/DriverWrapper.scala     | 23 +++++
 3 files changed, 137 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/da8c59bd/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
new file mode 100644
index 0000000..97f3803
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+import java.nio.file.Files
+
+import scala.collection.mutable.HashMap
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.util.MutableURLClassLoader
+
+private[deploy] object DependencyUtils {
+
+  def resolveMavenDependencies(
+      packagesExclusions: String,
+      packages: String,
+      repositories: String,
+      ivyRepoPath: String): String = {
+    val exclusions: Seq[String] =
+      if (!StringUtils.isBlank(packagesExclusions)) {
+        packagesExclusions.split(",")
+      } else {
+        Nil
+      }
+    // Create the IvySettings, either load from file or build defaults
+    val ivySettings = sys.props.get("spark.jars.ivySettings").map { 
ivySettingsFile =>
+      SparkSubmitUtils.loadIvySettings(ivySettingsFile, Option(repositories), 
Option(ivyRepoPath))
+    }.getOrElse {
+      SparkSubmitUtils.buildIvySettings(Option(repositories), 
Option(ivyRepoPath))
+    }
+
+    SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions 
= exclusions)
+  }
+
+  def createTempDir(): File = {
+    val targetDir = Files.createTempDirectory("tmp").toFile
+    // scalastyle:off runtimeaddshutdownhook
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run(): Unit = {
+        FileUtils.deleteQuietly(targetDir)
+      }
+    })
+    // scalastyle:on runtimeaddshutdownhook
+    targetDir
+  }
+
+  def resolveAndDownloadJars(jars: String, userJar: String): String = {
+    val targetDir = DependencyUtils.createTempDir()
+    val hadoopConf = new Configuration()
+    val sparkProperties = new HashMap[String, String]()
+    val securityProperties = List("spark.ssl.fs.trustStore", 
"spark.ssl.trustStore",
+      "spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword",
+      "spark.ssl.fs.protocol", "spark.ssl.protocol")
+
+    securityProperties.foreach { pName =>
+      sys.props.get(pName).foreach { pValue =>
+        sparkProperties.put(pName, pValue)
+      }
+    }
+
+    Option(jars)
+      .map {
+        SparkSubmit.resolveGlobPaths(_, hadoopConf)
+          .split(",")
+          .filterNot(_.contains(userJar.split("/").last))
+          .mkString(",")
+      }
+      .filterNot(_ == "")
+      .map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, 
hadoopConf))
+      .orNull
+  }
+
+  def addJarsToClassPath(jars: String, loader: MutableURLClassLoader): Unit = {
+    if (jars != null) {
+      for (jar <- jars.split(",")) {
+        SparkSubmit.addJarToClasspath(jar, loader)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/da8c59bd/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0ea1436..0197800 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy
 import java.io._
 import java.lang.reflect.{InvocationTargetException, Modifier, 
UndeclaredThrowableException}
 import java.net.URL
-import java.nio.file.Files
 import java.security.{KeyStore, PrivilegedExceptionAction}
 import java.security.cert.X509Certificate
 import java.text.ParseException
@@ -31,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 import scala.util.Properties
 
 import com.google.common.io.ByteStreams
-import org.apache.commons.io.FileUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -300,28 +298,13 @@ object SparkSubmit extends CommandLineUtils {
     }
     val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
     val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
+    val isStandAloneCluster = clusterManager == STANDALONE && deployMode == 
CLUSTER
 
-    if (!isMesosCluster) {
+    if (!isMesosCluster && !isStandAloneCluster) {
       // Resolve maven dependencies if there are any and add classpath to 
jars. Add them to py-files
       // too for packages that include Python code
-      val exclusions: Seq[String] =
-      if (!StringUtils.isBlank(args.packagesExclusions)) {
-        args.packagesExclusions.split(",")
-      } else {
-        Nil
-      }
-
-      // Create the IvySettings, either load from file or build defaults
-      val ivySettings = args.sparkProperties.get("spark.jars.ivySettings").map 
{ ivySettingsFile =>
-        SparkSubmitUtils.loadIvySettings(ivySettingsFile, 
Option(args.repositories),
-          Option(args.ivyRepoPath))
-      }.getOrElse {
-        SparkSubmitUtils.buildIvySettings(Option(args.repositories), 
Option(args.ivyRepoPath))
-      }
-
-      val resolvedMavenCoordinates = 
SparkSubmitUtils.resolveMavenCoordinates(args.packages,
-        ivySettings, exclusions = exclusions)
-
+      val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
+        args.packagesExclusions, args.packages, args.repositories, 
args.ivyRepoPath)
 
       if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
         args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
@@ -338,14 +321,7 @@ object SparkSubmit extends CommandLineUtils {
     }
 
     val hadoopConf = new HadoopConfiguration()
-    val targetDir = Files.createTempDirectory("tmp").toFile
-    // scalastyle:off runtimeaddshutdownhook
-    Runtime.getRuntime.addShutdownHook(new Thread() {
-      override def run(): Unit = {
-        FileUtils.deleteQuietly(targetDir)
-      }
-    })
-    // scalastyle:on runtimeaddshutdownhook
+    val targetDir = DependencyUtils.createTempDir()
 
     // Resolve glob path for different resources.
     args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
@@ -473,11 +449,13 @@ object SparkSubmit extends CommandLineUtils {
       OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
         sysProp = "spark.driver.extraLibraryPath"),
 
-      // Mesos only - propagate attributes for dependency resolution at the 
driver side
-      OptionAssigner(args.packages, MESOS, CLUSTER, sysProp = 
"spark.jars.packages"),
-      OptionAssigner(args.repositories, MESOS, CLUSTER, sysProp = 
"spark.jars.repositories"),
-      OptionAssigner(args.ivyRepoPath, MESOS, CLUSTER, sysProp = 
"spark.jars.ivy"),
-      OptionAssigner(args.packagesExclusions, MESOS, CLUSTER, sysProp = 
"spark.jars.excludes"),
+      // Propagate attributes for dependency resolution at the driver side
+      OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, sysProp = 
"spark.jars.packages"),
+      OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
+        sysProp = "spark.jars.repositories"),
+      OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, sysProp = 
"spark.jars.ivy"),
+      OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
+        CLUSTER, sysProp = "spark.jars.excludes"),
 
       // Yarn only
       OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = 
"spark.yarn.queue"),
@@ -780,7 +758,7 @@ object SparkSubmit extends CommandLineUtils {
     }
   }
 
-  private def addJarToClasspath(localJar: String, loader: 
MutableURLClassLoader) {
+  private[deploy] def addJarToClasspath(localJar: String, loader: 
MutableURLClassLoader) {
     val uri = Utils.resolveURI(localJar)
     uri.getScheme match {
       case "file" | "local" =>
@@ -845,7 +823,7 @@ object SparkSubmit extends CommandLineUtils {
    * Merge a sequence of comma-separated file lists, some of which may be null 
to indicate
    * no files, into a single comma-separated string.
    */
-  private def mergeFileLists(lists: String*): String = {
+  private[deploy] def mergeFileLists(lists: String*): String = {
     val merged = lists.filterNot(StringUtils.isBlank)
                       .flatMap(_.split(","))
                       .mkString(",")
@@ -968,7 +946,7 @@ object SparkSubmit extends CommandLineUtils {
     }
   }
 
-  private def resolveGlobPaths(paths: String, hadoopConf: 
HadoopConfiguration): String = {
+  private[deploy] def resolveGlobPaths(paths: String, hadoopConf: 
HadoopConfiguration): String = {
     require(paths != null, "paths cannot be null.")
     paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
       val uri = Utils.resolveURI(path)

http://git-wip-us.apache.org/repos/asf/spark/blob/da8c59bd/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 6799f78..cd3e361 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -19,7 +19,10 @@ package org.apache.spark.deploy.worker
 
 import java.io.File
 
+import org.apache.commons.lang3.StringUtils
+
 import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.{DependencyUtils, SparkSubmit}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, 
Utils}
 
@@ -51,6 +54,7 @@ object DriverWrapper {
             new MutableURLClassLoader(Array(userJarUrl), currentLoader)
           }
         Thread.currentThread.setContextClassLoader(loader)
+        setupDependencies(loader, userJar)
 
         // Delegate to supplied main class
         val clazz = Utils.classForName(mainClass)
@@ -66,4 +70,23 @@ object DriverWrapper {
         System.exit(-1)
     }
   }
+
+  private def setupDependencies(loader: MutableURLClassLoader, userJar: 
String): Unit = {
+    val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
+      Seq("spark.jars.excludes", "spark.jars.packages", 
"spark.jars.repositories", "spark.jars.ivy")
+        .map(sys.props.get(_).orNull)
+
+    val resolvedMavenCoordinates = 
DependencyUtils.resolveMavenDependencies(packagesExclusions,
+      packages, repositories, ivyRepoPath)
+    val jars = {
+      val jarsProp = sys.props.get("spark.jars").orNull
+      if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
+        SparkSubmit.mergeFileLists(jarsProp, resolvedMavenCoordinates)
+      } else {
+        jarsProp
+      }
+    }
+    val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar)
+    DependencyUtils.addJarsToClassPath(localJars, loader)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to