Repository: spark
Updated Branches:
  refs/heads/branch-1.3 1747e0a68 -> 5a55c9604


[SPARK-5979][SPARK-6032] Smaller safer --packages fix

pwendell tdas
This is the safer parts of PR #4754:
 - SPARK-5979: All dependencies with the groupId `org.apache.spark` passed 
through `--packages`, were being excluded from the dependency tree on the 
assumption that they would be in the assembly jar. This is not the case, 
therefore the exclusion rules had to be defined more explicitly.
 - SPARK-6032: Ivy prints a whole lot of logs while retrieving dependencies. 
These were printed to `System.out`. Moved the logging to `System.err`.

Author: Burak Yavuz <[email protected]>

Closes #4802 from brkyvz/simple-streaming-fix and squashes the following 
commits:

e0f38cb [Burak Yavuz] Merge branch 'master' of github.com:apache/spark into 
simple-streaming-fix
bad921c [Burak Yavuz] [SPARK-5979][SPARK-6032] Smaller safer fix

(cherry picked from commit 6d8e5fbc0d83411174ffa59ff6a761a862eca32c)
Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a55c960
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a55c960
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a55c960

Branch: refs/heads/branch-1.3
Commit: 5a55c9604f1da0c8bd2723b12f21a2f0b77c95dd
Parents: 1747e0a
Author: Burak Yavuz <[email protected]>
Authored: Fri Feb 27 22:59:35 2015 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Fri Feb 27 22:59:41 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   | 53 ++++++++++++++------
 .../spark/deploy/SparkSubmitUtilsSuite.scala    | 16 +++++-
 2 files changed, 51 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5a55c960/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4c41108..4a74641 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -655,8 +655,7 @@ private[spark] object SparkSubmitUtils {
 
 /**
  * Extracts maven coordinates from a comma-delimited string. Coordinates 
should be provided
- * in the format `groupId:artifactId:version` or `groupId/artifactId:version`. 
The latter provides
- * simplicity for Spark Package users.
+ * in the format `groupId:artifactId:version` or `groupId/artifactId:version`.
  * @param coordinates Comma-delimited string of maven coordinates
  * @return Sequence of Maven coordinates
  */
@@ -747,6 +746,35 @@ private[spark] object SparkSubmitUtils {
       md.addDependency(dd)
     }
   }
+  
+  /** Add exclusion rules for dependencies already included in the 
spark-assembly */
+  private[spark] def addExclusionRules(
+      ivySettings: IvySettings,
+      ivyConfName: String,
+      md: DefaultModuleDescriptor): Unit = {
+    // Add scala exclusion rule
+    val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), 
"*", "*", "*")
+    val scalaDependencyExcludeRule =
+      new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), 
null)
+    scalaDependencyExcludeRule.addConfiguration(ivyConfName)
+    md.addExcludeRule(scalaDependencyExcludeRule)
+
+    // We need to specify each component explicitly, otherwise we miss 
spark-streaming-kafka and
+    // other spark-streaming utility components. Underscore is there to 
differentiate between
+    // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
+    val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", 
"mllib_", "repl_",
+      "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", 
"network-yarn_")
+
+    components.foreach { comp =>
+      val sparkArtifacts =
+        new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", 
"*", "*")
+      val sparkDependencyExcludeRule =
+        new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), 
null)
+      sparkDependencyExcludeRule.addConfiguration(ivyConfName)
+
+      md.addExcludeRule(sparkDependencyExcludeRule)
+    }
+  }
 
   /** A nice function to use in tests as well. Values are dummy strings. */
   private[spark] def getModuleDescriptor = 
DefaultModuleDescriptor.newDefaultInstance(
@@ -768,6 +796,9 @@ private[spark] object SparkSubmitUtils {
     if (coordinates == null || coordinates.trim.isEmpty) {
       ""
     } else {
+      val sysOut = System.out
+      // To prevent ivy from logging to system out
+      System.setOut(printStream)
       val artifacts = extractMavenCoordinates(coordinates)
       // Default configuration name for ivy
       val ivyConfName = "default"
@@ -811,19 +842,9 @@ private[spark] object SparkSubmitUtils {
       val md = getModuleDescriptor
       md.setDefaultConf(ivyConfName)
 
-      // Add an exclusion rule for Spark and Scala Library
-      val sparkArtifacts = new ArtifactId(new ModuleId("org.apache.spark", 
"*"), "*", "*", "*")
-      val sparkDependencyExcludeRule =
-        new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), 
null)
-      sparkDependencyExcludeRule.addConfiguration(ivyConfName)
-      val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), 
"*", "*", "*")
-      val scalaDependencyExcludeRule =
-        new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), 
null)
-      scalaDependencyExcludeRule.addConfiguration(ivyConfName)
-
-      // Exclude any Spark dependencies, and add all supplied maven artifacts 
as dependencies
-      md.addExcludeRule(sparkDependencyExcludeRule)
-      md.addExcludeRule(scalaDependencyExcludeRule)
+      // Add exclusion rules for Spark and Scala Library
+      addExclusionRules(ivySettings, ivyConfName, md)
+      // add all supplied maven artifacts as dependencies
       addDependenciesToIvy(md, artifacts, ivyConfName)
 
       // resolve dependencies
@@ -835,7 +856,7 @@ private[spark] object SparkSubmitUtils {
       ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
         packagesDirectory.getAbsolutePath + File.separator + 
"[artifact](-[classifier]).[ext]",
         retrieveOptions.setConfs(Array(ivyConfName)))
-
+      System.setOut(sysOut)
       resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5a55c960/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index ad62b35..8bcca92 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -117,8 +117,20 @@ class SparkSubmitUtilsSuite extends FunSuite with 
BeforeAndAfterAll {
   }
 
   test("neglects Spark and Spark's dependencies") {
-    val path = SparkSubmitUtils.resolveMavenCoordinates(
-      "org.apache.spark:spark-core_2.10:1.2.0", None, None, true)
+    val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", 
"mllib_", "repl_",
+      "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", 
"network-yarn_")
+
+    val coordinates =
+      components.map(comp => 
s"org.apache.spark:spark-${comp}2.10:1.2.0").mkString(",") +
+      ",org.apache.spark:spark-core_fake:1.2.0"
+
+    val path = SparkSubmitUtils.resolveMavenCoordinates(coordinates, None, 
None, true)
     assert(path === "", "should return empty path")
+    // Should not exclude the following dependency. Will throw an error, 
because it doesn't exist,
+    // but the fact that it is checking means that it wasn't excluded.
+    intercept[RuntimeException] {
+      SparkSubmitUtils.resolveMavenCoordinates(coordinates +
+        ",org.apache.spark:spark-streaming-kafka-assembly_2.10:1.2.0", None, 
None, true)
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to