This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ad79e196a60 [SPARK-45506][CONNECT] Add ivy URI support to 
SparkConnect addArtifact
2ad79e196a60 is described below

commit 2ad79e196a60d8d4a3d13ba67b9e792294c380c6
Author: Vsevolod Stepanov <[email protected]>
AuthorDate: Tue Oct 31 08:47:30 2023 +0900

    [SPARK-45506][CONNECT] Add ivy URI support to SparkConnect addArtifact
    
    ### What changes were proposed in this pull request?
    This PR extends existing SparkConnect `SparkSession.addArtifact` API to 
support Apache Ivy URIs in extend to the existing local `.jar` & `.class` file 
support. It leverages the existing support of Apache Ivy which was previously a 
part of spark-code (`SparkSubmitUtils`)
    
    This PR contains the following changes:
    1. Refactoring `SparkSubmitUtils` and moving Ivy-related parts to `commons` 
module. This results in adding two small dependencies to `common` (`apache ivy` 
and `commons-io`)
    2. Extracting some parts of `Utils` & `TestUtils` to `common`
    3. Extending SparkConnect `SparkSession.addArtifact` to support Ivy URIs
    
    Refactoring changes done in 1-2 are not altering code functionality. These 
changes are mostly about moving code between files & packages and adjusting 
some dependencies
    
    ### Why are the changes needed?
    It's possible to add maven artifacts to Spark, but SparkConnect currently 
lacks this functionality
    
    ### Does this PR introduce _any_ user-facing change?
    Yes: it extends the existing SparkConnect `SparkSession.addArtifact` API to 
support more URI types
    
    ### How was this patch tested?
    Existing tests covering Ivy resolution functionality;
    New test in `ArtifactSuite` testing ivy support in `ArtifactManager`
    New E2E test in `ReplE2ESuite` testing adding maven library and using it 
from UDF
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43354 from vsevolodstep-db/SPARK-45506/add-artifact-ivy.
    
    Authored-by: Vsevolod Stepanov <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 common/utils/pom.xml                               |  14 +
 .../org/apache/spark/util}/IvyTestUtils.scala      |  49 +-
 .../scala/org/apache/spark/util/MavenUtils.scala   | 577 +++++++++++++++++++++
 .../org/apache/spark/util/SparkEnvUtils.scala      |  32 ++
 .../org/apache/spark/util/SparkTestUtils.scala     | 106 ++++
 .../org/apache/spark/util/MavenUtilsSuite.scala    |  96 ++--
 .../scala/org/apache/spark/sql/SparkSession.scala  |   4 +-
 .../spark/sql/application/ReplE2ESuite.scala       |  32 ++
 .../spark/sql/connect/client/ArtifactSuite.scala   |  16 +
 .../spark/sql/connect/client/ArtifactManager.scala |  46 +-
 core/pom.xml                                       |  10 -
 .../main/scala/org/apache/spark/TestUtils.scala    |  77 +--
 .../org/apache/spark/deploy/SparkSubmit.scala      | 405 +--------------
 .../org/apache/spark/internal/config/package.scala |   4 +-
 .../org/apache/spark/util/DependencyUtils.scala    |  91 +---
 .../main/scala/org/apache/spark/util/Utils.scala   |  10 +-
 .../apache/spark/deploy/RPackageUtilsSuite.scala   |   4 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala |   4 +-
 .../apache/spark/util/DependencyUtilsSuite.scala   |   6 +
 .../sql/hive/client/IsolatedClientLoader.scala     |  11 +-
 20 files changed, 934 insertions(+), 660 deletions(-)

diff --git a/common/utils/pom.xml b/common/utils/pom.xml
index 44cb30a19ff0..d360e041dd64 100644
--- a/common/utils/pom.xml
+++ b/common/utils/pom.xml
@@ -55,6 +55,20 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-text</artifactId>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ivy</groupId>
+      <artifactId>ivy</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>oro</groupId>
+      <!-- oro is needed by ivy, but only listed as an optional dependency, so 
we include it. -->
+      <artifactId>oro</artifactId>
+      <version>${oro.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
diff --git a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala
similarity index 91%
rename from core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
rename to common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala
index 0b970a03ad87..545693665e61 100644
--- a/core/src/test/scala/org/apache/spark/deploy/IvyTestUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala
@@ -15,30 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.{File, FileInputStream, FileOutputStream}
-import java.util.jar.{JarEntry, JarOutputStream}
+import java.util.jar.{JarEntry, JarOutputStream, Manifest}
 import java.util.jar.Attributes.Name
-import java.util.jar.Manifest
 
 import scala.collection.mutable.ArrayBuffer
 
-import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.FileUtils
 import org.apache.ivy.core.settings.IvySettings
 
-import org.apache.spark.TestUtils.{createCompiledClass, JavaSourceFromString}
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
-import org.apache.spark.util.Utils
+import org.apache.spark.util.MavenUtils.MavenCoordinate
 
-private[deploy] object IvyTestUtils {
+private[spark] object IvyTestUtils {
 
   /**
    * Create the path for the jar and pom from the maven coordinate. Extension 
should be `jar`
    * or `pom`.
    */
-  private[deploy] def pathFromCoordinate(
+  private[spark] def pathFromCoordinate(
       artifact: MavenCoordinate,
       prefix: File,
       ext: String,
@@ -55,7 +51,7 @@ private[deploy] object IvyTestUtils {
   }
 
   /** Returns the artifact naming based on standard ivy or maven format. */
-  private[deploy] def artifactName(
+  private[spark] def artifactName(
       artifact: MavenCoordinate,
       useIvyLayout: Boolean,
       ext: String = ".jar"): String = {
@@ -76,7 +72,7 @@ private[deploy] object IvyTestUtils {
   }
 
   /** Write the contents to a file to the supplied directory. */
-  private[deploy] def writeFile(dir: File, fileName: String, contents: 
String): File = {
+  private[spark] def writeFile(dir: File, fileName: String, contents: String): 
File = {
     val outputFile = new File(dir, fileName)
     val outputStream = new FileOutputStream(outputFile)
     outputStream.write(contents.toCharArray.map(_.toByte))
@@ -99,7 +95,7 @@ private[deploy] object IvyTestUtils {
       className: String,
       packageName: String): Seq[(String, File)] = {
     val rFilesDir = new File(dir, "R" + File.separator + "pkg")
-    Files.createParentDirs(new File(rFilesDir, "R" + File.separator + 
"mylib.R"))
+    new File(rFilesDir, "R").mkdirs()
     val contents =
       s"""myfunc <- function(x) {
         |  SparkR:::callJStatic("$packageName.$className", "myFunc", x)
@@ -143,8 +139,8 @@ private[deploy] object IvyTestUtils {
         |}
       """.stripMargin
     val sourceFile =
-      new JavaSourceFromString(new File(dir, className).toURI.getPath, 
contents)
-    createCompiledClass(className, dir, sourceFile, Seq.empty)
+      new SparkTestUtils.JavaSourceFromString(new File(dir, 
className).toURI.getPath, contents)
+    SparkTestUtils.createCompiledClass(className, dir, sourceFile, Seq.empty)
   }
 
   private def createDescriptor(
@@ -154,11 +150,11 @@ private[deploy] object IvyTestUtils {
       useIvyLayout: Boolean): File = {
     if (useIvyLayout) {
       val ivyXmlPath = pathFromCoordinate(artifact, tempPath, "ivy", true)
-      Files.createParentDirs(new File(ivyXmlPath, "dummy"))
+      ivyXmlPath.mkdirs()
       createIvyDescriptor(ivyXmlPath, artifact, dependencies)
     } else {
       val pomPath = pathFromCoordinate(artifact, tempPath, "pom", useIvyLayout)
-      Files.createParentDirs(new File(pomPath, "dummy"))
+      pomPath.mkdirs()
       createPom(pomPath, artifact, dependencies)
     }
   }
@@ -230,12 +226,11 @@ private[deploy] object IvyTestUtils {
       val inside = deps.map(ivyArtifactWriter).mkString("\n")
       "\n  <dependencies>\n" + inside + "\n  </dependencies>"
     }.getOrElse("")
-    content += "\n</ivy-module>"
     writeFile(dir, "ivy.xml", content.trim)
   }
 
   /** Create the jar for the given maven coordinate, using the supplied files. 
*/
-  private[deploy] def packJar(
+  private[spark] def packJar(
       dir: File,
       artifact: MavenCoordinate,
       files: Seq[(String, File)],
@@ -266,7 +261,7 @@ private[deploy] object IvyTestUtils {
       jarStream.putNextEntry(jarEntry)
 
       val in = new FileInputStream(file._2)
-      ByteStreams.copy(in, jarStream)
+      SparkStreamUtils.copyStream(in, jarStream)
       in.close()
     }
     jarStream.close()
@@ -295,15 +290,15 @@ private[deploy] object IvyTestUtils {
       withPython: Boolean = false,
       withR: Boolean = false): File = {
     // Where the root of the repository exists, and what Ivy will search in
-    val tempPath = tempDir.getOrElse(Utils.createTempDir())
+    val tempPath = tempDir.getOrElse(SparkFileUtils.createTempDir())
     // Create directory if it doesn't exist
-    Files.createParentDirs(tempPath)
+    tempPath.mkdirs()
     // Where to create temporary class files and such
     val root = new File(tempPath, tempPath.hashCode().toString)
-    Files.createParentDirs(new File(root, "dummy"))
+    root.mkdirs()
     try {
       val jarPath = pathFromCoordinate(artifact, tempPath, "jar", useIvyLayout)
-      Files.createParentDirs(new File(jarPath, "dummy"))
+      jarPath.mkdirs()
       val className = "MyLib"
 
       val javaClass = createJavaClass(root, className, artifact.groupId)
@@ -337,14 +332,14 @@ private[deploy] object IvyTestUtils {
    * @param withPython Whether to pack python files inside the jar for 
extensive testing.
    * @return Root path of the repository. Will be `rootDir` if supplied.
    */
-  private[deploy] def createLocalRepositoryForTests(
+  private[spark] def createLocalRepositoryForTests(
       artifact: MavenCoordinate,
       dependencies: Option[String],
       rootDir: Option[File],
       useIvyLayout: Boolean = false,
       withPython: Boolean = false,
       withR: Boolean = false): File = {
-    val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
+    val deps = dependencies.map(MavenUtils.extractMavenCoordinates)
     val mainRepo = createLocalRepository(artifact, deps, rootDir, 
useIvyLayout, withPython, withR)
     deps.foreach { seq => seq.foreach { dep =>
       createLocalRepository(dep, None, Some(mainRepo), useIvyLayout, 
withPython = false)
@@ -362,7 +357,7 @@ private[deploy] object IvyTestUtils {
    * @param withPython Whether to pack python files inside the jar for 
extensive testing.
    * @return Root path of the repository. Will be `rootDir` if supplied.
    */
-  private[deploy] def withRepository(
+  private[spark] def withRepository(
       artifact: MavenCoordinate,
       dependencies: Option[String],
       rootDir: Option[File],
@@ -370,7 +365,7 @@ private[deploy] object IvyTestUtils {
       withPython: Boolean = false,
       withR: Boolean = false,
       ivySettings: IvySettings = new IvySettings)(f: String => Unit): Unit = {
-    val deps = dependencies.map(SparkSubmitUtils.extractMavenCoordinates)
+    val deps = dependencies.map(MavenUtils.extractMavenCoordinates)
     purgeLocalIvyCache(artifact, deps, ivySettings)
     val repo = createLocalRepositoryForTests(artifact, dependencies, rootDir, 
useIvyLayout,
       withPython, withR)
diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
new file mode 100644
index 000000000000..f71ea873ab2c
--- /dev/null
+++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{File, IOException, PrintStream}
+import java.net.URI
+import java.text.ParseException
+import java.util.UUID
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.ivy.Ivy
+import org.apache.ivy.core.LogOptions
+import org.apache.ivy.core.module.descriptor.{Artifact, 
DefaultDependencyDescriptor, DefaultExcludeRule, DefaultModuleDescriptor, 
ExcludeRule}
+import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
+import org.apache.ivy.core.report.ResolveReport
+import org.apache.ivy.core.resolve.ResolveOptions
+import org.apache.ivy.core.retrieve.RetrieveOptions
+import org.apache.ivy.core.settings.IvySettings
+import org.apache.ivy.plugins.matcher.GlobPatternMatcher
+import org.apache.ivy.plugins.repository.file.FileRepository
+import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, 
IBiblioResolver}
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+
+/** Provides utility functions to be used inside SparkSubmit. */
+private[spark] object MavenUtils extends Logging {
+  val JAR_IVY_SETTING_PATH_KEY: String = "spark.jars.ivySettings"
+
+//  // Exposed for testing
+//  var printStream = SparkSubmit.printStream
+
+  // Exposed for testing.
+  // These components are used to make the default exclusion rules for Spark 
dependencies.
+  // We need to specify each component explicitly, otherwise we miss
+  // spark-streaming utility components. Underscore is there to differentiate 
between
+  // spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x
+  val IVY_DEFAULT_EXCLUDES: Seq[String] = Seq(
+    "catalyst_",
+    "core_",
+    "graphx_",
+    "kvstore_",
+    "launcher_",
+    "mllib_",
+    "mllib-local_",
+    "network-common_",
+    "network-shuffle_",
+    "repl_",
+    "sketch_",
+    "sql_",
+    "streaming_",
+    "tags_",
+    "unsafe_")
+
+  /**
+   * Represents a Maven Coordinate
+   *
+   * @param groupId
+   *   the groupId of the coordinate
+   * @param artifactId
+   *   the artifactId of the coordinate
+   * @param version
+   *   the version of the coordinate
+   */
+  private[spark] case class MavenCoordinate(
+      groupId: String,
+      artifactId: String,
+      version: String) {
+    override def toString: String = s"$groupId:$artifactId:$version"
+  }
+
+  /**
+   * Extracts maven coordinates from a comma-delimited string. Coordinates 
should be provided in
+   * the format `groupId:artifactId:version` or `groupId/artifactId:version`.
+   *
+   * @param coordinates
+   *   Comma-delimited string of maven coordinates
+   * @return
+   *   Sequence of Maven coordinates
+   */
+  def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
+    coordinates.split(",").map { p =>
+      val splits = p.replace("/", ":").split(":")
+      require(
+        splits.length == 3,
+        s"Provided Maven Coordinates must be in the form " +
+          s"'groupId:artifactId:version'. The coordinate provided is: $p")
+      require(
+        splits(0) != null && splits(0).trim.nonEmpty,
+        s"The groupId cannot be null or " +
+          s"be whitespace. The groupId provided is: ${splits(0)}")
+      require(
+        splits(1) != null && splits(1).trim.nonEmpty,
+        s"The artifactId cannot be null or " +
+          s"be whitespace. The artifactId provided is: ${splits(1)}")
+      require(
+        splits(2) != null && splits(2).trim.nonEmpty,
+        s"The version cannot be null or " +
+          s"be whitespace. The version provided is: ${splits(2)}")
+      new MavenCoordinate(splits(0), splits(1), splits(2))
+    }
+  }
+
+  /** Path of the local Maven cache. */
+  private[util] def m2Path: File = {
+    if (SparkEnvUtils.isTesting) {
+      // test builds delete the maven cache, and this can cause flakiness
+      new File("dummy", ".m2" + File.separator + "repository")
+    } else {
+      new File(System.getProperty("user.home"), ".m2" + File.separator + 
"repository")
+    }
+  }
+
+  /**
+   * Extracts maven coordinates from a comma-delimited string
+   *
+   * @param defaultIvyUserDir
+   *   The default user path for Ivy
+   * @return
+   *   A ChainResolver used by Ivy to search for and resolve dependencies.
+   */
+  private[util] def createRepoResolvers(defaultIvyUserDir: File): 
ChainResolver = {
+    // We need a chain resolver if we want to check multiple repositories
+    val cr = new ChainResolver
+    cr.setName("spark-list")
+
+    val localM2 = new IBiblioResolver
+    localM2.setM2compatible(true)
+    localM2.setRoot(m2Path.toURI.toString)
+    localM2.setUsepoms(true)
+    localM2.setName("local-m2-cache")
+    cr.add(localM2)
+
+    val localIvy = new FileSystemResolver
+    val localIvyRoot = new File(defaultIvyUserDir, "local")
+    localIvy.setLocal(true)
+    localIvy.setRepository(new FileRepository(localIvyRoot))
+    val ivyPattern = Seq(
+      localIvyRoot.getAbsolutePath,
+      "[organisation]",
+      "[module]",
+      "[revision]",
+      "ivys",
+      "ivy.xml").mkString(File.separator)
+    localIvy.addIvyPattern(ivyPattern)
+    val artifactPattern = Seq(
+      localIvyRoot.getAbsolutePath,
+      "[organisation]",
+      "[module]",
+      "[revision]",
+      "[type]s",
+      "[artifact](-[classifier]).[ext]").mkString(File.separator)
+    localIvy.addArtifactPattern(artifactPattern)
+    localIvy.setName("local-ivy-cache")
+    cr.add(localIvy)
+
+    // the biblio resolver resolves POM declared dependencies
+    val br: IBiblioResolver = new IBiblioResolver
+    br.setM2compatible(true)
+    br.setUsepoms(true)
+    val defaultInternalRepo: Option[String] = 
sys.env.get("DEFAULT_ARTIFACT_REPOSITORY")
+    
br.setRoot(defaultInternalRepo.getOrElse("https://repo1.maven.org/maven2/";))
+    br.setName("central")
+    cr.add(br)
+
+    val sp: IBiblioResolver = new IBiblioResolver
+    sp.setM2compatible(true)
+    sp.setUsepoms(true)
+    sp.setRoot(
+      sys.env.getOrElse("DEFAULT_ARTIFACT_REPOSITORY", 
"https://repos.spark-packages.org/";))
+    sp.setName("spark-packages")
+    cr.add(sp)
+    cr
+  }
+
+  /**
+   * Output a list of paths for the downloaded jars to be added to the 
classpath (will append to
+   * jars in SparkSubmit).
+   *
+   * @param artifacts
+   *   Sequence of dependencies that were resolved and retrieved
+   * @param cacheDirectory
+   *   Directory where jars are cached
+   * @return
+   *   List of paths for the dependencies
+   */
+  private[util] def resolveDependencyPaths(
+      artifacts: Array[AnyRef],
+      cacheDirectory: File): Seq[String] = {
+    artifacts
+      .map(_.asInstanceOf[Artifact])
+      .filter { artifactInfo =>
+        if (artifactInfo.getExt == "jar") {
+          true
+        } else {
+          logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
+          false
+        }
+      }
+      .map { artifactInfo =>
+        val artifact = artifactInfo.getModuleRevisionId
+        val extraAttrs = artifactInfo.getExtraAttributes
+        val classifier = if (extraAttrs.containsKey("classifier")) {
+          "-" + extraAttrs.get("classifier")
+        } else {
+          ""
+        }
+        cacheDirectory.getAbsolutePath + File.separator +
+          
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar"
+      }
+  }
+
+  /** Adds the given maven coordinates to Ivy's module descriptor. */
+  private[util] def addDependenciesToIvy(
+      md: DefaultModuleDescriptor,
+      artifacts: Seq[MavenCoordinate],
+      ivyConfName: String)(implicit printStream: PrintStream): Unit = {
+    artifacts.foreach { mvn =>
+      val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, 
mvn.version)
+      val dd = new DefaultDependencyDescriptor(ri, false, false)
+      dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
+      // scalastyle:off println
+      printStream.println(s"${dd.getDependencyId} added as a dependency")
+      // scalastyle:on println
+      md.addDependency(dd)
+    }
+  }
+
+  /** Add exclusion rules for dependencies already included in the 
spark-assembly */
+  private def addExclusionRules(
+      ivySettings: IvySettings,
+      ivyConfName: String,
+      md: DefaultModuleDescriptor): Unit = {
+    // Add scala exclusion rule
+    md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, 
ivyConfName))
+
+    IVY_DEFAULT_EXCLUDES.foreach { comp =>
+      md.addExcludeRule(
+        createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings, 
ivyConfName))
+    }
+  }
+
+  /**
+   * Build Ivy Settings using options with default resolvers
+   *
+   * @param remoteRepos
+   *   Comma-delimited string of remote repositories other than maven central
+   * @param ivyPath
+   *   The path to the local ivy repository
+   * @return
+   *   An IvySettings object
+   */
+  def buildIvySettings(remoteRepos: Option[String], ivyPath: 
Option[String])(implicit
+      printStream: PrintStream): IvySettings = {
+    val ivySettings: IvySettings = new IvySettings
+    processIvyPathArg(ivySettings, ivyPath)
+
+    // create a pattern matcher
+    ivySettings.addMatcher(new GlobPatternMatcher)
+    // create the dependency resolvers
+    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
+    ivySettings.addResolver(repoResolver)
+    ivySettings.setDefaultResolver(repoResolver.getName)
+    processRemoteRepoArg(ivySettings, remoteRepos)
+    // (since 2.5) Setting the property ivy.maven.lookup.sources to false
+    // disables the lookup of the sources artifact.
+    // And setting the property ivy.maven.lookup.javadoc to false
+    // disables the lookup of the javadoc artifact.
+    ivySettings.setVariable("ivy.maven.lookup.sources", "false")
+    ivySettings.setVariable("ivy.maven.lookup.javadoc", "false")
+    ivySettings
+  }
+
+  /**
+   * Load Ivy settings from a given filename, using supplied resolvers
+   *
+   * @param settingsFile
+   *   Path to Ivy settings file
+   * @param remoteRepos
+   *   Comma-delimited string of remote repositories other than maven central
+   * @param ivyPath
+   *   The path to the local ivy repository
+   * @return
+   *   An IvySettings object
+   */
+  def loadIvySettings(settingsFile: String, remoteRepos: Option[String], 
ivyPath: Option[String])(
+      implicit printStream: PrintStream): IvySettings = {
+    val uri = new URI(settingsFile)
+    val file = Option(uri.getScheme).getOrElse("file") match {
+      case "file" => new File(uri.getPath)
+      case scheme =>
+        throw new IllegalArgumentException(
+          s"Scheme $scheme not supported in " +
+            JAR_IVY_SETTING_PATH_KEY)
+    }
+    require(file.exists(), s"Ivy settings file $file does not exist")
+    require(file.isFile(), s"Ivy settings file $file is not a normal file")
+    val ivySettings: IvySettings = new IvySettings
+    try {
+      ivySettings.load(file)
+    } catch {
+      case e @ (_: IOException | _: ParseException) =>
+        throw new SparkException(s"Failed when loading Ivy settings from 
$settingsFile", e)
+    }
+    processIvyPathArg(ivySettings, ivyPath)
+    processRemoteRepoArg(ivySettings, remoteRepos)
+    ivySettings
+  }
+
+  /* Set ivy settings for location of cache, if option is supplied */
+  private def processIvyPathArg(ivySettings: IvySettings, ivyPath: 
Option[String]): Unit = {
+    ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
+      ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
+      ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
+    }
+  }
+
+  /* Add any optional additional remote repositories */
+  private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: 
Option[String])(implicit
+      printStream: PrintStream): Unit = {
+    remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { 
repositoryList =>
+      val cr = new ChainResolver
+      cr.setName("user-list")
+
+      // add current default resolver, if any
+      Option(ivySettings.getDefaultResolver).foreach(cr.add)
+
+      // add additional repositories, last resolution in chain takes precedence
+      repositoryList.zipWithIndex.foreach { case (repo, i) =>
+        val brr: IBiblioResolver = new IBiblioResolver
+        brr.setM2compatible(true)
+        brr.setUsepoms(true)
+        brr.setRoot(repo)
+        brr.setName(s"repo-${i + 1}")
+        cr.add(brr)
+        // scalastyle:off println
+        printStream.println(s"$repo added as a remote repository with the 
name: ${brr.getName}")
+      // scalastyle:on println
+      }
+
+      ivySettings.addResolver(cr)
+      ivySettings.setDefaultResolver(cr.getName)
+    }
+  }
+
+  /** A nice function to use in tests as well. Values are dummy strings. */
+  private[util] def getModuleDescriptor: DefaultModuleDescriptor =
+    DefaultModuleDescriptor.newDefaultInstance(ModuleRevisionId
+      // Include UUID in module name, so multiple clients resolving maven 
coordinate at the
+      // same time do not modify the same resolution file concurrently.
+      .newInstance("org.apache.spark", 
s"spark-submit-parent-${UUID.randomUUID.toString}", "1.0"))
+
+  /**
+   * Clear ivy resolution from current launch. The resolution file is usually 
at
+   * ~/.ivy2/org.apache.spark-spark-submit-parent-$UUID-default.xml,
+   * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.xml, and
+   * 
~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.properties. 
Since each launch
+   * will have its own resolution files created, delete them after each 
resolution to prevent
+   * accumulation of these files in the ivy cache dir.
+   */
+  private def clearIvyResolutionFiles(
+      mdId: ModuleRevisionId,
+      ivySettings: IvySettings,
+      ivyConfName: String): Unit = {
+    val currentResolutionFiles = Seq(
+      s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
+      
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml",
+      
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties")
+    currentResolutionFiles.foreach { filename =>
+      new File(ivySettings.getDefaultCache, filename).delete()
+    }
+  }
+
+  /**
+   * Resolves any dependencies that were supplied through maven coordinates
+   *
+   * @param coordinates
+   *   Comma-delimited string of maven coordinates
+   * @param ivySettings
+   *   An IvySettings containing resolvers to use
+   * @param transitive
+   *   Whether resolving transitive dependencies, default is true
+   * @param exclusions
+   *   Exclusions to apply when resolving transitive dependencies
+   * @return
+   *   Seq of path to the jars of the given maven artifacts including their 
transitive
+   *   dependencies
+   */
+  def resolveMavenCoordinates(
+      coordinates: String,
+      ivySettings: IvySettings,
+      transitive: Boolean,
+      exclusions: Seq[String] = Nil,
+      isTest: Boolean = false)(implicit printStream: PrintStream): Seq[String] 
= {
+    if (coordinates == null || coordinates.trim.isEmpty) {
+      Nil
+    } else {
+      val sysOut = System.out
+      // Default configuration name for ivy
+      val ivyConfName = "default"
+
+      // A Module descriptor must be specified. Entries are dummy strings
+      val md = getModuleDescriptor
+
+      md.setDefaultConf(ivyConfName)
+      try {
+        // To prevent ivy from logging to system out
+        System.setOut(printStream)
+        val artifacts = extractMavenCoordinates(coordinates)
+        // Directories for caching downloads through ivy and storing the jars 
when maven coordinates
+        // are supplied to spark-submit
+        val packagesDirectory: File = new 
File(ivySettings.getDefaultIvyUserDir, "jars")
+        // scalastyle:off println
+        printStream.println(
+          s"Ivy Default Cache set to: 
${ivySettings.getDefaultCache.getAbsolutePath}")
+        printStream.println(s"The jars for the packages stored in: 
$packagesDirectory")
+        // scalastyle:on println
+
+        val ivy = Ivy.newInstance(ivySettings)
+        // Set resolve options to download transitive dependencies as well
+        val resolveOptions = new ResolveOptions
+        resolveOptions.setTransitive(transitive)
+        val retrieveOptions = new RetrieveOptions
+        // Turn downloading and logging off for testing
+        if (isTest) {
+          resolveOptions.setDownload(false)
+          resolveOptions.setLog(LogOptions.LOG_QUIET)
+          retrieveOptions.setLog(LogOptions.LOG_QUIET)
+        } else {
+          resolveOptions.setDownload(true)
+        }
+
+        // Add exclusion rules for Spark and Scala Library
+        addExclusionRules(ivySettings, ivyConfName, md)
+        // add all supplied maven artifacts as dependencies
+        addDependenciesToIvy(md, artifacts, ivyConfName)
+        exclusions.foreach { e =>
+          md.addExcludeRule(createExclusion(e + ":*", ivySettings, 
ivyConfName))
+        }
+        // resolve dependencies
+        val rr: ResolveReport = ivy.resolve(md, resolveOptions)
+        if (rr.hasError) {
+          throw new RuntimeException(rr.getAllProblemMessages.toString)
+        }
+        // retrieve all resolved dependencies
+        retrieveOptions.setDestArtifactPattern(
+          packagesDirectory.getAbsolutePath + File.separator +
+            "[organization]_[artifact]-[revision](-[classifier]).[ext]")
+        ivy.retrieve(
+          rr.getModuleDescriptor.getModuleRevisionId,
+          retrieveOptions.setConfs(Array(ivyConfName)))
+        resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
+      } finally {
+        System.setOut(sysOut)
+        clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, 
ivyConfName)
+      }
+    }
+  }
+
+  private[util] def createExclusion(
+      coords: String,
+      ivySettings: IvySettings,
+      ivyConfName: String): ExcludeRule = {
+    val c = extractMavenCoordinates(coords)(0)
+    val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", 
"*")
+    val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
+    rule.addConfiguration(ivyConfName)
+    rule
+  }
+
+  def isInvalidQueryString(tokens: Array[String]): Boolean = {
+    tokens.length != 2 || StringUtils.isBlank(tokens(0)) || 
StringUtils.isBlank(tokens(1))
+  }
+
+  /**
+   * Parse URI query string's parameter value of `transitive` and `exclude`. 
Other invalid
+   * parameters will be ignored.
+   *
+   * @param uri
+   *   Ivy URI need to be downloaded.
+   * @return
+   *   Tuple value of parameter `transitive` and `exclude` value.
+   *
+   *   1. transitive: whether to download dependency jar of Ivy URI, default 
value is true and
+   *      this parameter value is case-insensitive. This mimics Hive's 
behaviour for parsing the
+   *      transitive parameter. Invalid value will be treat as false. Example: 
Input:
+   *      exclude=org.mortbay.jetty:jetty&transitive=true Output: true
+   *
+   * 2. exclude: comma separated exclusions to apply when resolving transitive 
dependencies,
+   * consists of `group:module` pairs separated by commas. Example: Input:
+   * excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http Output:
+   * [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
+   *
+   * 3. repos: comma separated repositories to use when resolving dependencies.
+   */
+  def parseQueryParams(uri: URI): (Boolean, String, String) = {
+    val uriQuery = uri.getQuery
+    if (uriQuery == null) {
+      (true, "", "")
+    } else {
+      val mapTokens = uriQuery.split("&").map(_.split("="))
+      if (mapTokens.exists(MavenUtils.isInvalidQueryString)) {
+        throw new IllegalArgumentException(
+          s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery")
+      }
+      val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+
+      // Parse transitive parameters (e.g., transitive=true) in an Ivy URI, 
default value is true
+      val transitiveParams = groupedParams.get("transitive")
+      if (transitiveParams.map(_.size).getOrElse(0) > 1) {
+        logWarning(
+          "It's best to specify `transitive` parameter in ivy URI query only 
once." +
+            " If there are multiple `transitive` parameter, we will select the 
last one")
+      }
+      val transitive =
+        transitiveParams
+          
.flatMap(_.takeRight(1).map(_._2.equalsIgnoreCase("true")).headOption)
+          .getOrElse(true)
+
+      // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+      // in an Ivy URI. When download Ivy URI jar, Spark won't download 
transitive jar
+      // in a excluded list.
+      val exclusionList = groupedParams
+        .get("exclude")
+        .map { params =>
+          params
+            .map(_._2)
+            .flatMap { excludeString =>
+              val excludes = excludeString.split(",")
+              if 
(excludes.map(_.split(":")).exists(MavenUtils.isInvalidQueryString)) {
+                throw new IllegalArgumentException(
+                  s"Invalid exclude string in Ivy URI ${uri.toString}:" +
+                    " expected 'org:module,org:module,..', found " + 
excludeString)
+              }
+              excludes
+            }
+            .mkString(",")
+        }
+        .getOrElse("")
+
+      val repos = groupedParams
+        .get("repos")
+        .map { params =>
+          params
+            .map(_._2)
+            .flatMap(_.split(","))
+            .mkString(",")
+        }
+        .getOrElse("")
+
+      val validParams = Set("transitive", "exclude", "repos")
+      val invalidParams = 
groupedParams.keys.filterNot(validParams.contains).toSeq
+      if (invalidParams.nonEmpty) {
+        logWarning(
+          s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " 
+
+            s"in Ivy URI query `$uriQuery`.")
+      }
+
+      (transitive, exclusionList, repos)
+    }
+  }
+}
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkEnvUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkEnvUtils.scala
new file mode 100644
index 000000000000..b54e6ee5d730
--- /dev/null
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkEnvUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+private[spark] trait SparkEnvUtils {
+
+  /**
+   * Indicates whether Spark is currently running unit tests.
+   */
+  def isTesting: Boolean = {
+    // Scala's `sys.env` creates a ton of garbage by constructing Scala 
immutable maps, so
+    // we directly use the Java APIs instead.
+    System.getenv("SPARK_TESTING") != null || 
System.getProperty("spark.testing") != null
+  }
+
+}
+
+object SparkEnvUtils extends SparkEnvUtils
diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkTestUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkTestUtils.scala
new file mode 100644
index 000000000000..bcb2668d31e6
--- /dev/null
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkTestUtils.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.File
+import java.net.{URI, URL}
+import java.nio.file.Files
+import java.util.Arrays
+import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
+
+import scala.jdk.CollectionConverters._
+
+trait SparkTestUtils {
+  // Adapted from the JavaCompiler.java doc examples
+  private val SOURCE = JavaFileObject.Kind.SOURCE
+
+  private def createURI(name: String) = {
+    URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
+  }
+
+  private[spark] class JavaSourceFromString(val name: String, val code: String)
+      extends SimpleJavaFileObject(createURI(name), SOURCE) {
+    override def getCharContent(ignoreEncodingErrors: Boolean): String = code
+  }
+
+  /** Creates a compiled class with the source file. Class file will be placed 
in destDir. */
+  def createCompiledClass(
+      className: String,
+      destDir: File,
+      sourceFile: JavaSourceFromString,
+      classpathUrls: Seq[URL]): File = {
+    val compiler = ToolProvider.getSystemJavaCompiler
+
+    // Calling this outputs a class file in pwd. It's easier to just rename 
the files than
+    // build a custom FileManager that controls the output location.
+    val options = if (classpathUrls.nonEmpty) {
+      Seq(
+        "-classpath",
+        classpathUrls
+          .map {
+            _.getFile
+          }
+          .mkString(File.pathSeparator))
+    } else {
+      Seq.empty
+    }
+    compiler.getTask(null, null, null, options.asJava, null, 
Arrays.asList(sourceFile)).call()
+
+    val fileName = className + ".class"
+    val result = new File(fileName)
+    assert(result.exists(), "Compiled file not found: " + 
result.getAbsolutePath())
+    val out = new File(destDir, fileName)
+
+    // renameTo cannot handle in and out files in different filesystems
+    // use google's Files.move instead
+    Files.move(result.toPath, out.toPath)
+
+    assert(out.exists(), "Destination file not moved: " + 
out.getAbsolutePath())
+    out
+  }
+
+  /** Creates a compiled class with the given name. Class file will be placed 
in destDir. */
+  def createCompiledClass(
+      className: String,
+      destDir: File,
+      toStringValue: String = "",
+      baseClass: String = null,
+      classpathUrls: Seq[URL] = Seq.empty,
+      implementsClasses: Seq[String] = Seq.empty,
+      extraCodeBody: String = "",
+      packageName: Option[String] = None): File = {
+    val extendsText = Option(baseClass).map { c => s" extends ${c}" 
}.getOrElse("")
+    val implementsText =
+      "implements " + (implementsClasses :+ 
"java.io.Serializable").mkString(", ")
+    val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
+    val sourceFile = new JavaSourceFromString(
+      className,
+      s"""
+         |$packageText
+         |public class $className $extendsText $implementsText {
+         |  @Override public String toString() { return "$toStringValue"; }
+         |
+         |  $extraCodeBody
+         |}
+        """.stripMargin)
+    createCompiledClass(className, destDir, sourceFile, classpathUrls)
+  }
+
+}
+
+object SparkTestUtils extends SparkTestUtils
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala 
b/common/utils/src/test/scala/org/apache/spark/util/MavenUtilsSuite.scala
similarity index 76%
rename from 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
rename to 
common/utils/src/test/scala/org/apache/spark/util/MavenUtilsSuite.scala
index f1c5165b457b..642eca3cf933 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/common/utils/src/test/scala/org/apache/spark/util/MavenUtilsSuite.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.{File, OutputStream, PrintStream}
 import java.net.URI
@@ -28,12 +28,14 @@ import scala.jdk.CollectionConverters._
 import org.apache.ivy.core.module.descriptor.MDArtifact
 import org.apache.ivy.core.settings.IvySettings
 import org.apache.ivy.plugins.resolver.{AbstractResolver, ChainResolver, 
FileSystemResolver, IBiblioResolver}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
-import org.apache.spark.util.{DependencyUtils, Utils}
+import org.apache.spark.util.MavenUtils.MavenCoordinate
 
-class SparkSubmitUtilsSuite extends SparkFunSuite {
+class MavenUtilsSuite
+    extends AnyFunSuite // scalastyle:ignore funsuite
+    with BeforeAndAfterAll {
 
   private var tempIvyPath: String = _
 
@@ -41,6 +43,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
     def write(b: Int) = {}
   }
 
+  implicit private val printStream: PrintStream = new BufferPrintStream
+
   /** Simple PrintStream that reads data into a buffer */
   private class BufferPrintStream extends PrintStream(noOpOutputStream) {
     val lineBuffer = ArrayBuffer[String]()
@@ -53,23 +57,21 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    // We don't want to write logs during testing
-    SparkSubmitUtils.printStream = new BufferPrintStream
-    tempIvyPath = Utils.createTempDir(namePrefix = "ivy").getAbsolutePath()
+    tempIvyPath = SparkFileUtils.createTempDir(namePrefix = 
"ivy").getAbsolutePath()
   }
 
   test("incorrect maven coordinate throws error") {
     val coordinates = Seq("a:b: ", " :a:b", "a: :b", "a:b:", ":a:b", "a::b", 
"::", "a:b", "a")
     for (coordinate <- coordinates) {
       intercept[IllegalArgumentException] {
-        SparkSubmitUtils.extractMavenCoordinates(coordinate)
+        MavenUtils.extractMavenCoordinates(coordinate)
       }
     }
   }
 
   test("create repo resolvers") {
     val settings = new IvySettings
-    val res1 = 
SparkSubmitUtils.createRepoResolvers(settings.getDefaultIvyUserDir)
+    val res1 = MavenUtils.createRepoResolvers(settings.getDefaultIvyUserDir)
     // should have central and spark-packages by default
     assert(res1.getResolvers.size() === 4)
     assert(res1.getResolvers.get(0).asInstanceOf[IBiblioResolver].getName === 
"local-m2-cache")
@@ -80,7 +82,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
 
   test("create additional resolvers") {
     val repos = "a/1,b/2,c/3"
-    val settings = SparkSubmitUtils.buildIvySettings(Option(repos), 
Some(tempIvyPath))
+    val settings = MavenUtils.buildIvySettings(Option(repos), 
Some(tempIvyPath))
     val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver]
     assert(resolver.getResolvers.size() === 4)
     val expected = repos.split(",").map(r => s"$r/")
@@ -94,19 +96,19 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
   }
 
   test("add dependencies works correctly") {
-    val md = SparkSubmitUtils.getModuleDescriptor
-    val artifacts = 
SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.12:0.1," +
+    val md = MavenUtils.getModuleDescriptor
+    val artifacts = 
MavenUtils.extractMavenCoordinates("com.databricks:spark-csv_2.12:0.1," +
       "com.databricks:spark-avro_2.12:0.1")
 
-    SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default")
+    MavenUtils.addDependenciesToIvy(md, artifacts, "default")
     assert(md.getDependencies.length === 2)
   }
 
   test("excludes works correctly") {
-    val md = SparkSubmitUtils.getModuleDescriptor
+    val md = MavenUtils.getModuleDescriptor
     val excludes = Seq("a:b", "c:d")
     excludes.foreach { e =>
-      md.addExcludeRule(SparkSubmitUtils.createExclusion(e + ":*", new 
IvySettings, "default"))
+      md.addExcludeRule(MavenUtils.createExclusion(e + ":*", new IvySettings, 
"default"))
     }
     val rules = md.getAllExcludeRules
     assert(rules.length === 2)
@@ -117,21 +119,21 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
     assert(rule2.getOrganisation === "c")
     assert(rule2.getName === "d")
     intercept[IllegalArgumentException] {
-      SparkSubmitUtils.createExclusion("e:f:g:h", new IvySettings, "default")
+      MavenUtils.createExclusion("e:f:g:h", new IvySettings, "default")
     }
   }
 
   test("ivy path works correctly") {
-    val md = SparkSubmitUtils.getModuleDescriptor
+    val md = MavenUtils.getModuleDescriptor
     val artifacts = for (i <- 0 until 3) yield new MDArtifact(md, s"jar-$i", 
"jar", "jar")
-    val jPaths = SparkSubmitUtils.resolveDependencyPaths(artifacts.toArray, 
new File(tempIvyPath))
+    val jPaths = MavenUtils.resolveDependencyPaths(artifacts.toArray, new 
File(tempIvyPath))
     assert(jPaths.count(_.startsWith(tempIvyPath)) >= 3)
     val main = MavenCoordinate("my.awesome.lib", "mylib", "0.1")
     IvyTestUtils.withRepository(main, None, None) { repo =>
       // end to end
-      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+      val jarPath = MavenUtils.resolveMavenCoordinates(
         main.toString,
-        SparkSubmitUtils.buildIvySettings(Option(repo), Some(tempIvyPath)),
+        MavenUtils.buildIvySettings(Option(repo), Some(tempIvyPath)),
         transitive = true,
         isTest = true)
       assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should use 
non-default ivy path")
@@ -142,10 +144,10 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
     val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
     val dep = "my.great.dep:mydep:0.5"
     // Local M2 repository
-    IvyTestUtils.withRepository(main, Some(dep), 
Some(SparkSubmitUtils.m2Path)) { repo =>
-      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+    IvyTestUtils.withRepository(main, Some(dep), Some(MavenUtils.m2Path)) { 
repo =>
+      val jarPath = MavenUtils.resolveMavenCoordinates(
         main.toString,
-        SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
+        MavenUtils.buildIvySettings(None, Some(tempIvyPath)),
         transitive = true,
         isTest = true)
       assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
@@ -155,9 +157,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
     val settings = new IvySettings
     val ivyLocal = new File(settings.getDefaultIvyUserDir, "local" + 
File.separator)
     IvyTestUtils.withRepository(main, Some(dep), Some(ivyLocal), useIvyLayout 
= true) { repo =>
-      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+      val jarPath = MavenUtils.resolveMavenCoordinates(
         main.toString,
-        SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
+        MavenUtils.buildIvySettings(None, Some(tempIvyPath)),
         transitive = true,
         isTest = true)
       assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
@@ -168,9 +170,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
     settings.setDefaultIvyUserDir(new File(tempIvyPath))
     IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), 
useIvyLayout = true,
       ivySettings = settings) { repo =>
-      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+      val jarPath = MavenUtils.resolveMavenCoordinates(
         main.toString,
-        SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
+        MavenUtils.buildIvySettings(None, Some(tempIvyPath)),
         transitive = true,
         isTest = true)
       assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
@@ -181,30 +183,30 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
 
   test("dependency not found throws RuntimeException") {
     intercept[RuntimeException] {
-      SparkSubmitUtils.resolveMavenCoordinates(
+      MavenUtils.resolveMavenCoordinates(
       "a:b:c",
-      SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
+      MavenUtils.buildIvySettings(None, Some(tempIvyPath)),
         transitive = true,
       isTest = true)
     }
   }
 
   test("neglects Spark and Spark's dependencies") {
-    val coordinates = SparkSubmitUtils.IVY_DEFAULT_EXCLUDES
+    val coordinates = MavenUtils.IVY_DEFAULT_EXCLUDES
       .map(comp => s"org.apache.spark:spark-${comp}2.12:2.4.0")
       .mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0"
 
-    val path = SparkSubmitUtils.resolveMavenCoordinates(
+    val path = MavenUtils.resolveMavenCoordinates(
       coordinates,
-      SparkSubmitUtils.buildIvySettings(None, Some(tempIvyPath)),
+      MavenUtils.buildIvySettings(None, Some(tempIvyPath)),
       transitive = true,
       isTest = true)
     assert(path.isEmpty, "should return empty path")
     val main = MavenCoordinate("org.apache.spark", 
"spark-streaming-kafka-assembly_2.12", "1.2.0")
     IvyTestUtils.withRepository(main, None, None) { repo =>
-      val files = SparkSubmitUtils.resolveMavenCoordinates(
+      val files = MavenUtils.resolveMavenCoordinates(
         coordinates + "," + main.toString,
-        SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)),
+        MavenUtils.buildIvySettings(Some(repo), Some(tempIvyPath)),
         transitive = true,
         isTest = true)
       assert(files.forall(_.indexOf(main.artifactId) >= 0), "Did not return 
artifact")
@@ -215,9 +217,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
     val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
     val dep = "my.great.dep:mydep:0.5"
     IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
-      val files = SparkSubmitUtils.resolveMavenCoordinates(
+      val files = MavenUtils.resolveMavenCoordinates(
         main.toString,
-        SparkSubmitUtils.buildIvySettings(Some(repo), Some(tempIvyPath)),
+        MavenUtils.buildIvySettings(Some(repo), Some(tempIvyPath)),
         exclusions = Seq("my.great.dep:mydep"),
         transitive = true,
         isTest = true)
@@ -248,14 +250,14 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
 
     val settingsFile = Paths.get(tempIvyPath, "ivysettings.xml")
     Files.write(settingsFile, settingsText.getBytes(StandardCharsets.UTF_8))
-    val settings = SparkSubmitUtils.loadIvySettings(settingsFile.toString, 
None, None)
-    settings.setDefaultIvyUserDir(new File(tempIvyPath))  // NOTE - can't set 
this through file
+    val settings = MavenUtils.loadIvySettings(settingsFile.toString, None, 
None)
+    settings.setDefaultIvyUserDir(new File(tempIvyPath)) // NOTE - can't set 
this through file
 
     val testUtilSettings = new IvySettings
     testUtilSettings.setDefaultIvyUserDir(new File(tempIvyPath))
     IvyTestUtils.withRepository(main, Some(dep), Some(dummyIvyLocal), 
useIvyLayout = true,
       ivySettings = testUtilSettings) { repo =>
-      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(main.toString, 
settings,
+      val jarPath = MavenUtils.resolveMavenCoordinates(main.toString, settings,
         transitive = true, isTest = true)
       assert(jarPath.exists(_.indexOf("mylib") >= 0), "should find artifact")
       assert(jarPath.forall(_.indexOf(tempIvyPath) >= 0), "should be in new 
ivy path")
@@ -267,8 +269,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
     val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
 
     IvyTestUtils.withRepository(main, None, None) { repo =>
-      val ivySettings = SparkSubmitUtils.buildIvySettings(Some(repo), 
Some(tempIvyPath))
-      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+      val ivySettings = MavenUtils.buildIvySettings(Some(repo), 
Some(tempIvyPath))
+      val jarPath = MavenUtils.resolveMavenCoordinates(
         main.toString,
         ivySettings,
         transitive = true,
@@ -293,8 +295,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
         .toList
       Files.write(mainPom, lines.asJava)
 
-      val ivySettings = SparkSubmitUtils.buildIvySettings(Some(repo), 
Some(tempIvyPath))
-      val jarPath = SparkSubmitUtils.resolveMavenCoordinates(
+      val ivySettings = MavenUtils.buildIvySettings(Some(repo), 
Some(tempIvyPath))
+      val jarPath = MavenUtils.resolveMavenCoordinates(
         main.toString,
         ivySettings,
         transitive = true,
@@ -303,10 +305,4 @@ class SparkSubmitUtilsSuite extends SparkFunSuite {
         s" Resolved jars are: $jarPath")
     }
   }
-
-  test("SPARK-39501: Resolve maven dependenicy in IPv6") {
-    assume(Utils.preferIPv6)
-    DependencyUtils.resolveMavenDependencies(
-      URI.create("ivy://org.apache.logging.log4j:log4j-api:2.17.2"))
-  }
 }
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 78daaa5d3f5c..2d3e4205da9b 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -576,7 +576,7 @@ class SparkSession private[sql] (
   /**
    * Add a single artifact to the client session.
    *
-   * Currently only local files with extensions .jar and .class are supported.
+   * Currently it supports local files with extensions .jar and .class and 
Apache Ivy URIs
    *
    * @since 3.4.0
    */
@@ -586,7 +586,7 @@ class SparkSession private[sql] (
   /**
    * Add one or more artifacts to the session.
    *
-   * Currently only local files with extensions .jar and .class are supported.
+   * Currently it supports local files with extensions .jar and .class and 
Apache Ivy URIs
    *
    * @since 3.4.0
    */
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
index 51e58f9b0bba..8c1e298f9c77 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
@@ -26,6 +26,8 @@ import org.apache.commons.io.output.ByteArrayOutputStream
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.test.{IntegrationTestUtils, RemoteSparkSession}
+import org.apache.spark.util.IvyTestUtils
+import org.apache.spark.util.MavenUtils.MavenCoordinate
 
 class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {
 
@@ -194,6 +196,36 @@ class ReplE2ESuite extends RemoteSparkSession with 
BeforeAndAfterEach {
     // scalastyle:on classforname line.size.limit
   }
 
+  test("External JAR") {
+    val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
+    IvyTestUtils.withRepository(main, None, None) { repo =>
+      val input =
+        s"""
+           |// this import will fail
+           |import my.great.lib.MyLib
+           |
+           |// making library available in the REPL to compile UDF
+           |import coursierapi.{Credentials, MavenRepository}
+           |interp.repositories() ++= Seq(MavenRepository.of("$repo"))
+           |import $$ivy.`my.great.lib:mylib:0.1`
+           |
+           |val func = udf((a: Int) => {
+           |  import my.great.lib.MyLib
+           |  MyLib.myFunc(a)
+           |})
+           |
+           |// add library to the Executor
+           |spark.addArtifact("ivy://my.great.lib:mylib:0.1?repos=$repo")
+           |
+           |spark.range(5).select(func(col("id"))).as[Int].collect()
+           |""".stripMargin
+      val output = runCommandsInShell(input)
+      // making sure the library was not available before installation
+      assertContains("not found: value my", getCleanString(errorStream))
+      assertContains("Array[Int] = Array(1, 2, 3, 4, 5)", output)
+    }
+  }
+
   test("Java UDF") {
     val input =
       """
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
index 9c06f9428154..e11d2bf2e3ab 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.connect.client
 
 import java.io.InputStream
+import java.net.URI
 import java.nio.file.{Files, Path, Paths}
 import java.util.concurrent.TimeUnit
 
@@ -31,6 +32,8 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.connect.proto.AddArtifactsRequest
 import org.apache.spark.sql.connect.client.SparkConnectClient.Configuration
 import org.apache.spark.sql.test.ConnectFunSuite
+import org.apache.spark.util.IvyTestUtils
+import org.apache.spark.util.MavenUtils.MavenCoordinate
 
 class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
 
@@ -268,4 +271,17 @@ class ArtifactSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     val receivedRequests = service.getAndClearLatestAddArtifactRequests()
     assert(receivedRequests.size == 1)
   }
+
+  test("resolve ivy") {
+    val main = new MavenCoordinate("my.great.lib", "mylib", "0.1")
+    val dep = "my.great.dep:mydep:0.5"
+    IvyTestUtils.withRepository(main, Some(dep), None) { repo =>
+      val artifacts =
+        
Artifact.newIvyArtifacts(URI.create(s"ivy://my.great.lib:mylib:0.1?repos=$repo"))
+      
assert(artifacts.exists(_.path.toString.contains("jars/my.great.lib_mylib-0.1.jar")))
+      // transitive dependency
+      
assert(artifacts.exists(_.path.toString.contains("jars/my.great.dep_mydep-0.5.jar")))
+    }
+
+  }
 }
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index 7401164048ba..84ed8a56eb8d 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.connect.client
 
-import java.io.{ByteArrayInputStream, InputStream}
+import java.io.{ByteArrayInputStream, InputStream, PrintStream}
 import java.net.URI
 import java.nio.file.{Files, Path, Paths}
 import java.util.Arrays
@@ -33,11 +33,12 @@ import Artifact._
 import com.google.protobuf.ByteString
 import io.grpc.stub.StreamObserver
 import org.apache.commons.codec.digest.DigestUtils.sha256Hex
+import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.AddArtifactsResponse
 import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
-import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}
+import org.apache.spark.util.{MavenUtils, SparkFileUtils, SparkThreadUtils}
 
 /**
  * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
@@ -91,6 +92,9 @@ class ArtifactManager(
         }
         Seq[Artifact](artifact)
 
+      case "ivy" =>
+        newIvyArtifacts(uri)
+
       case other =>
         throw new UnsupportedOperationException(s"Unsupported scheme: $other")
     }
@@ -99,14 +103,14 @@ class ArtifactManager(
   /**
    * Add a single artifact to the session.
    *
-   * Currently only local files with extensions .jar and .class are supported.
+   * Currently it supports local files with extensions .jar and .class and 
Apache Ivy URIs
    */
   def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
 
   /**
    * Add multiple artifacts to the session.
    *
-   * Currently only local files with extensions .jar and .class are supported.
+   * Currently it supports local files with extensions .jar and .class and 
Apache Ivy URIs
    */
   def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
 
@@ -361,6 +365,40 @@ object Artifact {
     newArtifact(CACHE_PREFIX, "", Paths.get(id), storage)
   }
 
+  def newIvyArtifacts(uri: URI): Seq[Artifact] = {
+    implicit val printStream: PrintStream = System.err
+
+    val authority = uri.getAuthority
+    if (authority == null) {
+      throw new IllegalArgumentException(
+        s"Invalid Ivy URI authority in uri ${uri.toString}:" +
+          " Expected 'org:module:version', found null.")
+    }
+    if (authority.split(":").length != 3) {
+      throw new IllegalArgumentException(
+        s"Invalid Ivy URI authority in uri ${uri.toString}:" +
+          s" Expected 'org:module:version', found $authority.")
+    }
+
+    val (transitive, exclusions, repos) = MavenUtils.parseQueryParams(uri)
+
+    val exclusionsList: Seq[String] =
+      if (!StringUtils.isBlank(exclusions)) {
+        exclusions.split(",")
+      } else {
+        Nil
+      }
+
+    val ivySettings = MavenUtils.buildIvySettings(Some(repos), None)
+
+    val jars = MavenUtils.resolveMavenCoordinates(
+      authority,
+      ivySettings,
+      transitive = transitive,
+      exclusions = exclusionsList)
+    jars.map(p => Paths.get(p)).map(path => newJarArtifact(path.getFileName, 
new LocalFile(path)))
+  }
+
   private def newArtifact(
       prefix: Path,
       requiredSuffix: String,
diff --git a/core/pom.xml b/core/pom.xml
index e55283b75fa3..4cb0fe055fd0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -347,16 +347,6 @@
       <artifactId>derby</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.ivy</groupId>
-      <artifactId>ivy</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>oro</groupId>
-      <!-- oro is needed by ivy, but only listed as an optional dependency, so 
we include it. -->
-      <artifactId>oro</artifactId>
-      <version>${oro.version}</version>
-    </dependency>
     <dependency>
       <groupId>org.seleniumhq.selenium</groupId>
       <artifactId>selenium-java</artifactId>
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 8c3af9850ce9..fafde3cf12c6 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -18,23 +18,21 @@
 package org.apache.spark
 
 import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
-import java.net.{HttpURLConnection, InetSocketAddress, URI, URL}
+import java.net.{HttpURLConnection, InetSocketAddress, URL}
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files => JavaFiles, Paths}
 import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, 
OWNER_WRITE}
 import java.security.SecureRandom
 import java.security.cert.X509Certificate
-import java.util.{Arrays, EnumSet, Locale}
+import java.util.{EnumSet, Locale}
 import java.util.concurrent.{TimeoutException, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream, Manifest}
 import java.util.regex.Pattern
 import javax.net.ssl._
-import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
-import scala.jdk.CollectionConverters._
 import scala.reflect.{classTag, ClassTag}
 import scala.sys.process.{Process, ProcessLogger}
 import scala.util.Try
@@ -55,7 +53,7 @@ import org.json4s.jackson.JsonMethods.{compact, render}
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SparkTestUtils, Utils}
 
 /**
  * Utilities for tests. Included in main codebase since it's used by multiple
@@ -64,7 +62,7 @@ import org.apache.spark.util.Utils
  * TODO: See if we can move this to the test codebase by specifying
  * test dependencies between projects.
  */
-private[spark] object TestUtils {
+private[spark] object TestUtils extends SparkTestUtils {
 
   /**
    * Create a jar that defines classes with the given names.
@@ -144,73 +142,6 @@ private[spark] object TestUtils {
     jarFile.toURI.toURL
   }
 
-  // Adapted from the JavaCompiler.java doc examples
-  private val SOURCE = JavaFileObject.Kind.SOURCE
-  private def createURI(name: String) = {
-    URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
-  }
-
-  private[spark] class JavaSourceFromString(val name: String, val code: String)
-    extends SimpleJavaFileObject(createURI(name), SOURCE) {
-    override def getCharContent(ignoreEncodingErrors: Boolean): String = code
-  }
-
-  /** Creates a compiled class with the source file. Class file will be placed 
in destDir. */
-  def createCompiledClass(
-      className: String,
-      destDir: File,
-      sourceFile: JavaSourceFromString,
-      classpathUrls: Seq[URL]): File = {
-    val compiler = ToolProvider.getSystemJavaCompiler
-
-    // Calling this outputs a class file in pwd. It's easier to just rename 
the files than
-    // build a custom FileManager that controls the output location.
-    val options = if (classpathUrls.nonEmpty) {
-      Seq("-classpath", classpathUrls.map { _.getFile 
}.mkString(File.pathSeparator))
-    } else {
-      Seq.empty
-    }
-    compiler.getTask(null, null, null, options.asJava, null, 
Arrays.asList(sourceFile)).call()
-
-    val fileName = className + ".class"
-    val result = new File(fileName)
-    assert(result.exists(), "Compiled file not found: " + 
result.getAbsolutePath())
-    val out = new File(destDir, fileName)
-
-    // renameTo cannot handle in and out files in different filesystems
-    // use google's Files.move instead
-    Files.move(result, out)
-
-    assert(out.exists(), "Destination file not moved: " + 
out.getAbsolutePath())
-    out
-  }
-
-  /** Creates a compiled class with the given name. Class file will be placed 
in destDir. */
-  def createCompiledClass(
-      className: String,
-      destDir: File,
-      toStringValue: String = "",
-      baseClass: String = null,
-      classpathUrls: Seq[URL] = Seq.empty,
-      implementsClasses: Seq[String] = Seq.empty,
-      extraCodeBody: String = "",
-      packageName: Option[String] = None): File = {
-    val extendsText = Option(baseClass).map { c => s" extends ${c}" 
}.getOrElse("")
-    val implementsText =
-      "implements " + (implementsClasses :+ 
"java.io.Serializable").mkString(", ")
-    val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
-    val sourceFile = new JavaSourceFromString(className,
-      s"""
-         |$packageText
-         |public class $className $extendsText $implementsText {
-         |  @Override public String toString() { return "$toStringValue"; }
-         |
-         |  $extraCodeBody
-         |}
-        """.stripMargin)
-    createCompiledClass(className, destDir, sourceFile, classpathUrls)
-  }
-
   /**
    * Run some code involving jobs submitted to the given context and assert 
that the jobs spilled.
    */
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 883ca62ae22b..e60be5d5a651 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -22,8 +22,7 @@ import java.lang.reflect.{InvocationTargetException, 
UndeclaredThrowableExceptio
 import java.net.{URI, URL}
 import java.nio.file.Files
 import java.security.PrivilegedExceptionAction
-import java.text.ParseException
-import java.util.{ServiceLoader, UUID}
+import java.util.ServiceLoader
 import java.util.jar.JarInputStream
 import javax.ws.rs.core.UriBuilder
 
@@ -37,17 +36,6 @@ import org.apache.hadoop.conf.{Configuration => 
HadoopConfiguration}
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.ivy.Ivy
-import org.apache.ivy.core.LogOptions
-import org.apache.ivy.core.module.descriptor._
-import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
-import org.apache.ivy.core.report.ResolveReport
-import org.apache.ivy.core.resolve.ResolveOptions
-import org.apache.ivy.core.retrieve.RetrieveOptions
-import org.apache.ivy.core.settings.IvySettings
-import org.apache.ivy.plugins.matcher.GlobPatternMatcher
-import org.apache.ivy.plugins.repository.file.FileRepository
-import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, 
IBiblioResolver}
 
 import org.apache.spark._
 import org.apache.spark.api.r.RUtils
@@ -1153,389 +1141,7 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
 
 }
 
-/** Provides utility functions to be used inside SparkSubmit. */
-private[spark] object SparkSubmitUtils extends Logging {
-
-  // Exposed for testing
-  var printStream = SparkSubmit.printStream
-
-  // Exposed for testing.
-  // These components are used to make the default exclusion rules for Spark 
dependencies.
-  // We need to specify each component explicitly, otherwise we miss
-  // spark-streaming utility components. Underscore is there to differentiate 
between
-  // spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x
-  val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", 
"launcher_", "mllib_",
-    "mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", 
"sql_", "streaming_",
-    "tags_", "unsafe_")
-
-  /**
-   * Represents a Maven Coordinate
-   * @param groupId the groupId of the coordinate
-   * @param artifactId the artifactId of the coordinate
-   * @param version the version of the coordinate
-   */
-  private[deploy] case class MavenCoordinate(groupId: String, artifactId: 
String, version: String) {
-    override def toString: String = s"$groupId:$artifactId:$version"
-  }
-
-  /**
-   * Extracts maven coordinates from a comma-delimited string. Coordinates 
should be provided
-   * in the format `groupId:artifactId:version` or 
`groupId/artifactId:version`.
-   * @param coordinates Comma-delimited string of maven coordinates
-   * @return Sequence of Maven coordinates
-   */
-  def extractMavenCoordinates(coordinates: String): Seq[MavenCoordinate] = {
-    coordinates.split(",").map { p =>
-      val splits = p.replace("/", ":").split(":")
-      require(splits.length == 3, s"Provided Maven Coordinates must be in the 
form " +
-        s"'groupId:artifactId:version'. The coordinate provided is: $p")
-      require(splits(0) != null && splits(0).trim.nonEmpty, s"The groupId 
cannot be null or " +
-        s"be whitespace. The groupId provided is: ${splits(0)}")
-      require(splits(1) != null && splits(1).trim.nonEmpty, s"The artifactId 
cannot be null or " +
-        s"be whitespace. The artifactId provided is: ${splits(1)}")
-      require(splits(2) != null && splits(2).trim.nonEmpty, s"The version 
cannot be null or " +
-        s"be whitespace. The version provided is: ${splits(2)}")
-      new MavenCoordinate(splits(0), splits(1), splits(2))
-    }
-  }
-
-  /** Path of the local Maven cache. */
-  private[spark] def m2Path: File = {
-    if (Utils.isTesting) {
-      // test builds delete the maven cache, and this can cause flakiness
-      new File("dummy", ".m2" + File.separator + "repository")
-    } else {
-      new File(System.getProperty("user.home"), ".m2" + File.separator + 
"repository")
-    }
-  }
-
-  /**
-   * Extracts maven coordinates from a comma-delimited string
-   * @param defaultIvyUserDir The default user path for Ivy
-   * @return A ChainResolver used by Ivy to search for and resolve 
dependencies.
-   */
-  def createRepoResolvers(defaultIvyUserDir: File): ChainResolver = {
-    // We need a chain resolver if we want to check multiple repositories
-    val cr = new ChainResolver
-    cr.setName("spark-list")
-
-    val localM2 = new IBiblioResolver
-    localM2.setM2compatible(true)
-    localM2.setRoot(m2Path.toURI.toString)
-    localM2.setUsepoms(true)
-    localM2.setName("local-m2-cache")
-    cr.add(localM2)
-
-    val localIvy = new FileSystemResolver
-    val localIvyRoot = new File(defaultIvyUserDir, "local")
-    localIvy.setLocal(true)
-    localIvy.setRepository(new FileRepository(localIvyRoot))
-    val ivyPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", 
"[module]", "[revision]",
-      "ivys", "ivy.xml").mkString(File.separator)
-    localIvy.addIvyPattern(ivyPattern)
-    val artifactPattern = Seq(localIvyRoot.getAbsolutePath, "[organisation]", 
"[module]",
-      "[revision]", "[type]s", 
"[artifact](-[classifier]).[ext]").mkString(File.separator)
-    localIvy.addArtifactPattern(artifactPattern)
-    localIvy.setName("local-ivy-cache")
-    cr.add(localIvy)
-
-    // the biblio resolver resolves POM declared dependencies
-    val br: IBiblioResolver = new IBiblioResolver
-    br.setM2compatible(true)
-    br.setUsepoms(true)
-    val defaultInternalRepo : Option[String] = 
sys.env.get("DEFAULT_ARTIFACT_REPOSITORY")
-    
br.setRoot(defaultInternalRepo.getOrElse("https://repo1.maven.org/maven2/";))
-    br.setName("central")
-    cr.add(br)
-
-    val sp: IBiblioResolver = new IBiblioResolver
-    sp.setM2compatible(true)
-    sp.setUsepoms(true)
-    sp.setRoot(sys.env.getOrElse(
-      "DEFAULT_ARTIFACT_REPOSITORY", "https://repos.spark-packages.org/";))
-    sp.setName("spark-packages")
-    cr.add(sp)
-    cr
-  }
-
-  /**
-   * Output a list of paths for the downloaded jars to be added to the 
classpath
-   * (will append to jars in SparkSubmit).
-   * @param artifacts Sequence of dependencies that were resolved and retrieved
-   * @param cacheDirectory Directory where jars are cached
-   * @return List of paths for the dependencies
-   */
-  def resolveDependencyPaths(
-      artifacts: Array[AnyRef],
-      cacheDirectory: File): Seq[String] = {
-    artifacts.map(_.asInstanceOf[Artifact]).filter { artifactInfo =>
-      if (artifactInfo.getExt == "jar") {
-        true
-      } else {
-        logInfo(s"Skipping non-jar dependency ${artifactInfo.getId}")
-        false
-      }
-    }.map { artifactInfo =>
-      val artifact = artifactInfo.getModuleRevisionId
-      val extraAttrs = artifactInfo.getExtraAttributes
-      val classifier = if (extraAttrs.containsKey("classifier")) {
-        "-" + extraAttrs.get("classifier")
-      } else {
-        ""
-      }
-      cacheDirectory.getAbsolutePath + File.separator +
-        
s"${artifact.getOrganisation}_${artifact.getName}-${artifact.getRevision}$classifier.jar"
-    }
-  }
-
-  /** Adds the given maven coordinates to Ivy's module descriptor. */
-  def addDependenciesToIvy(
-      md: DefaultModuleDescriptor,
-      artifacts: Seq[MavenCoordinate],
-      ivyConfName: String): Unit = {
-    artifacts.foreach { mvn =>
-      val ri = ModuleRevisionId.newInstance(mvn.groupId, mvn.artifactId, 
mvn.version)
-      val dd = new DefaultDependencyDescriptor(ri, false, false)
-      dd.addDependencyConfiguration(ivyConfName, ivyConfName + "(runtime)")
-      // scalastyle:off println
-      printStream.println(s"${dd.getDependencyId} added as a dependency")
-      // scalastyle:on println
-      md.addDependency(dd)
-    }
-  }
-
-  /** Add exclusion rules for dependencies already included in the 
spark-assembly */
-  def addExclusionRules(
-      ivySettings: IvySettings,
-      ivyConfName: String,
-      md: DefaultModuleDescriptor): Unit = {
-    // Add scala exclusion rule
-    md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, 
ivyConfName))
-
-    IVY_DEFAULT_EXCLUDES.foreach { comp =>
-      md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", 
ivySettings,
-        ivyConfName))
-    }
-  }
-
-  /**
-   * Build Ivy Settings using options with default resolvers
-   * @param remoteRepos Comma-delimited string of remote repositories other 
than maven central
-   * @param ivyPath The path to the local ivy repository
-   * @return An IvySettings object
-   */
-  def buildIvySettings(remoteRepos: Option[String], ivyPath: Option[String]): 
IvySettings = {
-    val ivySettings: IvySettings = new IvySettings
-    processIvyPathArg(ivySettings, ivyPath)
-
-    // create a pattern matcher
-    ivySettings.addMatcher(new GlobPatternMatcher)
-    // create the dependency resolvers
-    val repoResolver = createRepoResolvers(ivySettings.getDefaultIvyUserDir)
-    ivySettings.addResolver(repoResolver)
-    ivySettings.setDefaultResolver(repoResolver.getName)
-    processRemoteRepoArg(ivySettings, remoteRepos)
-    // (since 2.5) Setting the property ivy.maven.lookup.sources to false
-    // disables the lookup of the sources artifact.
-    // And setting the property ivy.maven.lookup.javadoc to false
-    // disables the lookup of the javadoc artifact.
-    ivySettings.setVariable("ivy.maven.lookup.sources", "false")
-    ivySettings.setVariable("ivy.maven.lookup.javadoc", "false")
-    ivySettings
-  }
-
-  /**
-   * Load Ivy settings from a given filename, using supplied resolvers
-   * @param settingsFile Path to Ivy settings file
-   * @param remoteRepos Comma-delimited string of remote repositories other 
than maven central
-   * @param ivyPath The path to the local ivy repository
-   * @return An IvySettings object
-   */
-  def loadIvySettings(
-      settingsFile: String,
-      remoteRepos: Option[String],
-      ivyPath: Option[String]): IvySettings = {
-    val uri = new URI(settingsFile)
-    val file = Option(uri.getScheme).getOrElse("file") match {
-      case "file" => new File(uri.getPath)
-      case scheme => throw new IllegalArgumentException(s"Scheme $scheme not 
supported in " +
-        JAR_IVY_SETTING_PATH.key)
-    }
-    require(file.exists(), s"Ivy settings file $file does not exist")
-    require(file.isFile(), s"Ivy settings file $file is not a normal file")
-    val ivySettings: IvySettings = new IvySettings
-    try {
-      ivySettings.load(file)
-    } catch {
-      case e @ (_: IOException | _: ParseException) =>
-        throw new SparkException(s"Failed when loading Ivy settings from 
$settingsFile", e)
-    }
-    processIvyPathArg(ivySettings, ivyPath)
-    processRemoteRepoArg(ivySettings, remoteRepos)
-    ivySettings
-  }
-
-  /* Set ivy settings for location of cache, if option is supplied */
-  private def processIvyPathArg(ivySettings: IvySettings, ivyPath: 
Option[String]): Unit = {
-    ivyPath.filterNot(_.trim.isEmpty).foreach { alternateIvyDir =>
-      ivySettings.setDefaultIvyUserDir(new File(alternateIvyDir))
-      ivySettings.setDefaultCache(new File(alternateIvyDir, "cache"))
-    }
-  }
-
-  /* Add any optional additional remote repositories */
-  private def processRemoteRepoArg(ivySettings: IvySettings, remoteRepos: 
Option[String]): Unit = {
-    remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { 
repositoryList =>
-      val cr = new ChainResolver
-      cr.setName("user-list")
-
-      // add current default resolver, if any
-      Option(ivySettings.getDefaultResolver).foreach(cr.add)
-
-      // add additional repositories, last resolution in chain takes precedence
-      repositoryList.zipWithIndex.foreach { case (repo, i) =>
-        val brr: IBiblioResolver = new IBiblioResolver
-        brr.setM2compatible(true)
-        brr.setUsepoms(true)
-        brr.setRoot(repo)
-        brr.setName(s"repo-${i + 1}")
-        cr.add(brr)
-        // scalastyle:off println
-        printStream.println(s"$repo added as a remote repository with the 
name: ${brr.getName}")
-        // scalastyle:on println
-      }
-
-      ivySettings.addResolver(cr)
-      ivySettings.setDefaultResolver(cr.getName)
-    }
-  }
-
-  /** A nice function to use in tests as well. Values are dummy strings. */
-  def getModuleDescriptor: DefaultModuleDescriptor = 
DefaultModuleDescriptor.newDefaultInstance(
-    // Include UUID in module name, so multiple clients resolving maven 
coordinate at the same time
-    // do not modify the same resolution file concurrently.
-    ModuleRevisionId.newInstance("org.apache.spark",
-      s"spark-submit-parent-${UUID.randomUUID.toString}",
-      "1.0"))
-
-  /**
-   * Clear ivy resolution from current launch. The resolution file is usually 
at
-   * ~/.ivy2/org.apache.spark-spark-submit-parent-$UUID-default.xml,
-   * ~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.xml, and
-   * 
~/.ivy2/resolved-org.apache.spark-spark-submit-parent-$UUID-1.0.properties.
-   * Since each launch will have its own resolution files created, delete them 
after
-   * each resolution to prevent accumulation of these files in the ivy cache 
dir.
-   */
-  private def clearIvyResolutionFiles(
-      mdId: ModuleRevisionId,
-      ivySettings: IvySettings,
-      ivyConfName: String): Unit = {
-    val currentResolutionFiles = Seq(
-      s"${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml",
-      
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.xml",
-      
s"resolved-${mdId.getOrganisation}-${mdId.getName}-${mdId.getRevision}.properties"
-    )
-    currentResolutionFiles.foreach { filename =>
-      new File(ivySettings.getDefaultCache, filename).delete()
-    }
-  }
-
-  /**
-   * Resolves any dependencies that were supplied through maven coordinates
-   * @param coordinates Comma-delimited string of maven coordinates
-   * @param ivySettings An IvySettings containing resolvers to use
-   * @param transitive Whether resolving transitive dependencies, default is 
true
-   * @param exclusions Exclusions to apply when resolving transitive 
dependencies
-   * @return Seq of path to the jars of the given maven artifacts including 
their
-   *         transitive dependencies
-   */
-  def resolveMavenCoordinates(
-      coordinates: String,
-      ivySettings: IvySettings,
-      transitive: Boolean,
-      exclusions: Seq[String] = Nil,
-      isTest: Boolean = false): Seq[String] = {
-    if (coordinates == null || coordinates.trim.isEmpty) {
-      Nil
-    } else {
-      val sysOut = System.out
-      // Default configuration name for ivy
-      val ivyConfName = "default"
-
-      // A Module descriptor must be specified. Entries are dummy strings
-      val md = getModuleDescriptor
-
-      md.setDefaultConf(ivyConfName)
-      try {
-        // To prevent ivy from logging to system out
-        System.setOut(printStream)
-        val artifacts = extractMavenCoordinates(coordinates)
-        // Directories for caching downloads through ivy and storing the jars 
when maven coordinates
-        // are supplied to spark-submit
-        val packagesDirectory: File = new 
File(ivySettings.getDefaultIvyUserDir, "jars")
-        // scalastyle:off println
-        printStream.println(
-          s"Ivy Default Cache set to: 
${ivySettings.getDefaultCache.getAbsolutePath}")
-        printStream.println(s"The jars for the packages stored in: 
$packagesDirectory")
-        // scalastyle:on println
-
-        val ivy = Ivy.newInstance(ivySettings)
-        // Set resolve options to download transitive dependencies as well
-        val resolveOptions = new ResolveOptions
-        resolveOptions.setTransitive(transitive)
-        val retrieveOptions = new RetrieveOptions
-        // Turn downloading and logging off for testing
-        if (isTest) {
-          resolveOptions.setDownload(false)
-          resolveOptions.setLog(LogOptions.LOG_QUIET)
-          retrieveOptions.setLog(LogOptions.LOG_QUIET)
-        } else {
-          resolveOptions.setDownload(true)
-        }
-
-        // Add exclusion rules for Spark and Scala Library
-        addExclusionRules(ivySettings, ivyConfName, md)
-        // add all supplied maven artifacts as dependencies
-        addDependenciesToIvy(md, artifacts, ivyConfName)
-        exclusions.foreach { e =>
-          md.addExcludeRule(createExclusion(e + ":*", ivySettings, 
ivyConfName))
-        }
-        // resolve dependencies
-        val rr: ResolveReport = ivy.resolve(md, resolveOptions)
-        if (rr.hasError) {
-          throw new RuntimeException(rr.getAllProblemMessages.toString)
-        }
-        // retrieve all resolved dependencies
-        
retrieveOptions.setDestArtifactPattern(packagesDirectory.getAbsolutePath + 
File.separator +
-          "[organization]_[artifact]-[revision](-[classifier]).[ext]")
-        ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
-          retrieveOptions.setConfs(Array(ivyConfName)))
-        resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
-      } finally {
-        System.setOut(sysOut)
-        clearIvyResolutionFiles(md.getModuleRevisionId, ivySettings, 
ivyConfName)
-      }
-    }
-  }
-
-  private[deploy] def createExclusion(
-      coords: String,
-      ivySettings: IvySettings,
-      ivyConfName: String): ExcludeRule = {
-    val c = extractMavenCoordinates(coords)(0)
-    val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", 
"*")
-    val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
-    rule.addConfiguration(ivyConfName)
-    rule
-  }
-
-  def parseSparkConfProperty(pair: String): (String, String) = {
-    pair.split("=", 2).toSeq match {
-      case Seq(k, v) => (k, v)
-      case _ => throw new SparkException(s"Spark config without '=': $pair")
-    }
-  }
-
+private[spark] object SparkSubmitUtils {
   private[deploy] def getSubmitOperations(master: String): 
SparkSubmitOperation = {
     val loader = Utils.getContextOrSparkClassLoader
     val serviceLoaders =
@@ -1553,6 +1159,13 @@ private[spark] object SparkSubmitUtils extends Logging {
           s"clients found for master url: '$master'")
     }
   }
+
+  def parseSparkConfProperty(pair: String): (String, String) = {
+    pair.split("=", 2).toSeq match {
+      case Seq(k, v) => (k, v)
+      case _ => throw new SparkException(s"Spark config without '=': $pair")
+    }
+  }
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 4f2c33d2a195..7b0fcf3433cf 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -30,7 +30,7 @@ import org.apache.spark.scheduler.{EventLoggingListener, 
SchedulingMode}
 import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO
 import org.apache.spark.storage.{DefaultTopologyMapper, 
RandomBlockReplicationPolicy}
 import org.apache.spark.unsafe.array.ByteArrayMethods
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{MavenUtils, Utils}
 import 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_BUFFER_SIZE_BYTES
 
 package object config {
@@ -2452,7 +2452,7 @@ package object config {
       .createOptional
 
   private[spark] val JAR_IVY_SETTING_PATH =
-    ConfigBuilder("spark.jars.ivySettings")
+    ConfigBuilder(MavenUtils.JAR_IVY_SETTING_PATH_KEY)
       .doc("Path to an Ivy settings file to customize resolution of jars 
specified " +
         "using spark.jars.packages instead of the built-in defaults, such as 
maven central. " +
         "Additional repositories given by the command-line option 
--repositories " +
diff --git a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala 
b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
index e0c233757192..1d158ad50dc5 100644
--- a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util
 
-import java.io.File
+import java.io.{File, PrintStream}
 import java.net.URI
 
 import org.apache.commons.lang3.StringUtils
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.SparkSubmitUtils
+import org.apache.spark.deploy.SparkSubmit
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 
@@ -49,76 +49,6 @@ private[spark] object DependencyUtils extends Logging {
     IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, 
ivySettingsPath)
   }
 
-  private def isInvalidQueryString(tokens: Array[String]): Boolean = {
-    tokens.length != 2 || StringUtils.isBlank(tokens(0)) || 
StringUtils.isBlank(tokens(1))
-  }
-
-  /**
-   * Parse URI query string's parameter value of `transitive` and `exclude`.
-   * Other invalid parameters will be ignored.
-   *
-   * @param uri Ivy URI need to be downloaded.
-   * @return Tuple value of parameter `transitive` and `exclude` value.
-   *
-   *         1. transitive: whether to download dependency jar of Ivy URI, 
default value is true
-   *            and this parameter value is case-insensitive. This mimics 
Hive's behaviour for
-   *            parsing the transitive parameter. Invalid value will be treat 
as false.
-   *            Example: Input:  
exclude=org.mortbay.jetty:jetty&transitive=true
-   *            Output:  true
-   *
-   *         2. exclude: comma separated exclusions to apply when resolving 
transitive dependencies,
-   *            consists of `group:module` pairs separated by commas.
-   *            Example: Input:  
excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
-   *            Output:  [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http]
-   */
-  private def parseQueryParams(uri: URI): (Boolean, String) = {
-    val uriQuery = uri.getQuery
-    if (uriQuery == null) {
-      (true, "")
-    } else {
-      val mapTokens = uriQuery.split("&").map(_.split("="))
-      if (mapTokens.exists(isInvalidQueryString)) {
-        throw new IllegalArgumentException(
-          s"Invalid query string in Ivy URI ${uri.toString}: $uriQuery")
-      }
-      val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
-
-      // Parse transitive parameters (e.g., transitive=true) in an Ivy URI, 
default value is true
-      val transitiveParams = groupedParams.get("transitive")
-      if (transitiveParams.map(_.size).getOrElse(0) > 1) {
-        logWarning("It's best to specify `transitive` parameter in ivy URI 
query only once." +
-          " If there are multiple `transitive` parameter, we will select the 
last one")
-      }
-      val transitive =
-        
transitiveParams.flatMap(_.takeRight(1).map(_._2.equalsIgnoreCase("true")).headOption)
-          .getOrElse(true)
-
-      // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
-      // in an Ivy URI. When download Ivy URI jar, Spark won't download 
transitive jar
-      // in a excluded list.
-      val exclusionList = groupedParams.get("exclude").map { params =>
-        params.map(_._2).flatMap { excludeString =>
-          val excludes = excludeString.split(",")
-          if (excludes.map(_.split(":")).exists(isInvalidQueryString)) {
-            throw new IllegalArgumentException(
-              s"Invalid exclude string in Ivy URI ${uri.toString}:" +
-                " expected 'org:module,org:module,..', found " + excludeString)
-          }
-          excludes
-        }.mkString(",")
-      }.getOrElse("")
-
-      val validParams = Set("transitive", "exclude")
-      val invalidParams = 
groupedParams.keys.filterNot(validParams.contains).toSeq
-      if (invalidParams.nonEmpty) {
-        logWarning(s"Invalid parameters 
`${invalidParams.sorted.mkString(",")}` found " +
-          s"in Ivy URI query `$uriQuery`.")
-      }
-
-      (transitive, exclusionList)
-    }
-  }
-
   /**
    * Download Ivy URI's dependency jars.
    *
@@ -148,13 +78,15 @@ private[spark] object DependencyUtils extends Logging {
           s" Expected 'org:module:version', found $authority.")
     }
 
-    val (transitive, exclusionList) = parseQueryParams(uri)
-
+    val (transitive, exclusionList, repos) = MavenUtils.parseQueryParams(uri)
+    val fullReposList = Seq(ivyProperties.repositories, repos)
+      .filter(!StringUtils.isBlank(_))
+      .mkString(",")
     resolveMavenDependencies(
       transitive,
       exclusionList,
       authority,
-      ivyProperties.repositories,
+      fullReposList,
       ivyProperties.ivyRepoPath,
       Option(ivyProperties.ivySettingsPath)
     )
@@ -174,15 +106,18 @@ private[spark] object DependencyUtils extends Logging {
         Nil
       }
     // Create the IvySettings, either load from file or build defaults
+    implicit val printStream: PrintStream = SparkSubmit.printStream
     val ivySettings = ivySettingsPath match {
       case Some(path) =>
-        SparkSubmitUtils.loadIvySettings(path, Option(repositories), 
Option(ivyRepoPath))
+        MavenUtils.loadIvySettings(path, Option(repositories), 
Option(ivyRepoPath))
 
       case None =>
-        SparkSubmitUtils.buildIvySettings(Option(repositories), 
Option(ivyRepoPath))
+        MavenUtils.buildIvySettings(
+          Option(repositories),
+          Option(ivyRepoPath))
     }
 
-    SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings,
+    MavenUtils.resolveMavenCoordinates(packages, ivySettings,
       transitive = packagesTransitive, exclusions = exclusions)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6e3f42bd16db..99ba13479898 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -95,6 +95,7 @@ private[spark] object CallSite {
 private[spark] object Utils
   extends Logging
   with SparkClassUtils
+  with SparkEnvUtils
   with SparkErrorUtils
   with SparkFileUtils
   with SparkSerDeUtils
@@ -1790,15 +1791,6 @@ private[spark] object Utils
    */
   val windowsDrive = "([a-zA-Z])".r
 
-  /**
-   * Indicates whether Spark is currently running unit tests.
-   */
-  def isTesting: Boolean = {
-    // Scala's `sys.env` creates a ton of garbage by constructing Scala 
immutable maps, so
-    // we directly use the Java APIs instead.
-    System.getenv("SPARK_TESTING") != null || 
System.getProperty(IS_TESTING.key) != null
-  }
-
   /**
    * Terminates a process waiting for at most the specified duration.
    *
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
index b9ee492ed1cb..904d3f228ec9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala
@@ -31,8 +31,8 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.api.r.RUtils
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
-import org.apache.spark.util.{ResetSystemProperties, Utils}
+import org.apache.spark.util.{IvyTestUtils, ResetSystemProperties, Utils}
+import org.apache.spark.util.MavenUtils.MavenCoordinate
 
 class RPackageUtilsSuite
   extends SparkFunSuite
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index f7d900b537af..80510bef2005 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -38,13 +38,13 @@ import org.apache.spark.TestUtils
 import org.apache.spark.TestUtils.JavaSourceFromString
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.SparkSubmit._
-import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
 import org.apache.spark.deploy.history.EventLogFileReader
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.launcher.SparkLauncher
-import org.apache.spark.util.{CommandLineUtils, DependencyUtils, 
ResetSystemProperties, Utils}
+import org.apache.spark.util.{CommandLineUtils, DependencyUtils, IvyTestUtils, 
ResetSystemProperties, Utils}
+import org.apache.spark.util.MavenUtils.MavenCoordinate
 
 trait TestPrematureExit {
   suite: SparkFunSuite =>
diff --git 
a/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala
index bf8edeff37c2..a465123ac58a 100644
--- a/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/DependencyUtilsSuite.scala
@@ -57,4 +57,10 @@ class DependencyUtilsSuite extends SparkFunSuite {
       "ivy://org.apache.test:test-test:1.0.0?exclude=org.apache: " +
       "expected 'org:module,org:module,..', found org.apache"))
   }
+
+  test("SPARK-39501: Resolve maven dependenicy in IPv6") {
+    assume(Utils.preferIPv6)
+    DependencyUtils.resolveMavenDependencies(
+      URI.create("ivy://org.apache.logging.log4j:log4j-api:2.17.2"))
+  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index d6489f043910..68b0f2176fbc 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.client
 
-import java.io.File
+import java.io.{File, PrintStream}
 import java.lang.reflect.InvocationTargetException
 import java.net.{URL, URLClassLoader}
 import java.util
@@ -31,14 +31,14 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hadoop.hive.shims.ShimLoader
 
 import org.apache.spark.SparkConf
-import org.apache.spark.deploy.SparkSubmitUtils
+import org.apache.spark.deploy.SparkSubmit
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{MutableURLClassLoader, Utils, VersionUtils}
+import org.apache.spark.util.{MavenUtils, MutableURLClassLoader, Utils, 
VersionUtils}
 
 /** Factory for `IsolatedClientLoader` with specific versions of hive. */
 private[hive] object IsolatedClientLoader extends Logging {
@@ -127,10 +127,11 @@ private[hive] object IsolatedClientLoader extends Logging 
{
         .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
       Seq("com.google.guava:guava:14.0.1") ++ hadoopJarNames
 
+    implicit val printStream: PrintStream = SparkSubmit.printStream
     val classpaths = quietly {
-      SparkSubmitUtils.resolveMavenCoordinates(
+      MavenUtils.resolveMavenCoordinates(
         hiveArtifacts.mkString(","),
-        SparkSubmitUtils.buildIvySettings(
+        MavenUtils.buildIvySettings(
           Some(remoteRepos),
           ivyPath),
         transitive = true,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to