This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e4f2b0d307f [SPARK-47475][CORE][K8S] Support 
`spark.kubernetes.jars.avoidDownloadSchemes` for K8s Cluster Mode
2e4f2b0d307f is described below

commit 2e4f2b0d307fa00121de77f01826c190527ebf3d
Author: jiale_tan <jiale_...@apple.com>
AuthorDate: Thu Mar 28 08:52:27 2024 -0700

    [SPARK-47475][CORE][K8S] Support 
`spark.kubernetes.jars.avoidDownloadSchemes` for K8s Cluster Mode
    
    ### What changes were proposed in this pull request?
    
    During spark submit, for K8s cluster mode driver, instead of always 
downloading jars and serving it to executors, avoid the download if the url 
matches `spark.kubernetes.jars.avoidDownloadSchemes` in the configuration.
    
    ### Why are the changes needed?
    
    For K8s cluster mode driver, `SparkSubmit` will download all the jars in 
the `spark.jars` to driver and then those jars' urls in `spark.jars` will be 
replaced by the driver local paths. Later when driver starts the 
`SparkContext`, it will copy all the `spark.jars` to 
`spark.app.initial.jar.urls`, start a file server and replace the jars with 
driver local paths in `spark.app.initial.jar.urls` with file service urls. When 
the executors start, they will download those driver local jars b [...]
    When jars are big and the spark application requests a lot of executors, 
the executors' massive concurrent download of the jars from the driver will 
cause network saturation. In this case, the executors jar download will 
timeout, causing executors to be terminated. From user point of view, the 
application is trapped in the loop of massive executor loss and re-provision, 
but never gets enough live executors as requested, leads to SLA breach or 
sometimes failure.
    So instead of letting driver to download the jars and then serve them to 
executors, if we just avoid driver from downloading the jars and keeping the 
urls in `spark.jars` as they were, the executor will try to directly download 
the jars from the urls provided by user. This will avoid the driver download 
bottleneck mentioned above, especially when jar urls are with scalable storage 
schemes, like s3 or hdfs.
    Meanwhile, there are cases jar urls are with schemes of less scalable than 
driver file server, e.g. http, ftp, etc, or when the jars are small, or 
executor count is small - user may still want to fall back to current solution 
and use driver file server to serve the jars.
    So in this case, make the driver jars downloading and serving optional by 
scheme (similar idea to `FORCE_DOWNLOAD_SCHEMES` in YARN) is a good approach 
for the solution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    A configuration `spark.kubernetes.jars.avoidDownloadSchemes` is added
    
    ### How was this patch tested?
    
    - Unit tests added
    - Tested with an application running on AWS EKS submitted with a 1GB jar on 
s3.
      - Before the fix, the application could not scale to 1k live executors.
      - After the fix, the application had no problem to scale beyond 12k live 
executors.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45715 from leletan/allow_k8s_executor_to_download_remote_jar.
    
    Authored-by: jiale_tan <jiale_...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 28 ++++++++++-----
 .../org/apache/spark/internal/config/package.scala | 12 +++++++
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 42 ++++++++++++++++++++++
 docs/running-on-kubernetes.md                      | 12 +++++++
 4 files changed, 86 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c8cbedd9ea36..c60fbe537cbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -401,16 +401,23 @@ private[spark] class SparkSubmit extends Logging {
         // SPARK-33782 : This downloads all the files , jars , archiveFiles 
and pyfiles to current
         // working directory
         // SPARK-43540: add current working directory into driver classpath
+        // SPARK-47475: make download to driver optional so executors may 
fetch resource from remote
+        // url directly to avoid overwhelming driver network when resource is 
big and executor count
+        // is high
         val workingDirectory = "."
         childClasspath += workingDirectory
-        def downloadResourcesToCurrentDirectory(uris: String, isArchive: 
Boolean = false):
-        String = {
+        def downloadResourcesToCurrentDirectory(
+            uris: String,
+            isArchive: Boolean = false,
+            avoidDownload: String => Boolean = _ => false): String = {
           val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
+          val (avoidDownloads, toDownloads) =
+            resolvedUris.partition(uri => avoidDownload(uri.getScheme))
           val localResources = downloadFileList(
-            resolvedUris.map(
+            toDownloads.map(
               
Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","),
             targetDir, sparkConf, hadoopConf)
-          
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
+          
(Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(toDownloads).map {
             case (localResources, resolvedUri) =>
               val source = new File(localResources.getPath).getCanonicalFile
               val dest = new File(
@@ -427,14 +434,19 @@ private[spark] class SparkSubmit extends Logging {
               // Keep the URIs of local files with the given fragments.
               Utils.getUriBuilder(
                 
localResources).fragment(resolvedUri.getFragment).build().toString
-          }.mkString(",")
+          } ++ avoidDownloads.map(_.toString)).mkString(",")
         }
 
+        val avoidJarDownloadSchemes = 
sparkConf.get(KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES)
+
+        def avoidJarDownload(scheme: String): Boolean =
+          avoidJarDownloadSchemes.contains("*") || 
avoidJarDownloadSchemes.contains(scheme)
+
         val filesLocalFiles = Option(args.files).map {
           downloadResourcesToCurrentDirectory(_)
         }.orNull
-        val jarsLocalJars = Option(args.jars).map {
-          downloadResourcesToCurrentDirectory(_)
+        val updatedJars = Option(args.jars).map {
+          downloadResourcesToCurrentDirectory(_, avoidDownload = 
avoidJarDownload)
         }.orNull
         val archiveLocalFiles = Option(args.archives).map {
           downloadResourcesToCurrentDirectory(_, true)
@@ -445,7 +457,7 @@ private[spark] class SparkSubmit extends Logging {
         args.files = filesLocalFiles
         args.archives = archiveLocalFiles
         args.pyFiles = pyLocalFiles
-        args.jars = jarsLocalJars
+        args.jars = updatedJars
       }
     }
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 49f24dfbd826..5a6c52481c64 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1458,6 +1458,18 @@ package object config {
       .doubleConf
       .createWithDefault(1.5)
 
+  private[spark] val KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES =
+    ConfigBuilder("spark.kubernetes.jars.avoidDownloadSchemes")
+      .doc("Comma-separated list of schemes for which jars will NOT be 
downloaded to the " +
+        "driver local disk prior to be distributed to executors, only for 
kubernetes deployment. " +
+        "For use in cases when the jars are big and executor counts are high, 
" +
+        "concurrent download causes network saturation and timeouts. " +
+        "Wildcard '*' is denoted to not downloading jars for any the schemes.")
+      .version("4.0.0")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
   private[spark] val FORCE_DOWNLOAD_SCHEMES =
     ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
       .doc("Comma-separated list of schemes for which resources will be 
downloaded to the " +
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index bec0a90d0f47..f55c00d7d61a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -558,6 +558,48 @@ class SparkSubmitSuite
     Files.delete(Paths.get("TestUDTF.jar"))
   }
 
+  test("SPARK-47475: Avoid jars download if scheme matches " +
+    "spark.kubernetes.jars.avoidDownloadSchemes " +
+    "in k8s client mode & driver runs inside a POD") {
+    val hadoopConf = new Configuration()
+    updateConfWithFakeS3Fs(hadoopConf)
+    withTempDir { tmpDir =>
+      val notToDownload = File.createTempFile("NotToDownload", ".jar", tmpDir)
+      val remoteJarFile = s"s3a://${notToDownload.getAbsolutePath}"
+
+      val clArgs = Seq(
+        "--deploy-mode", "client",
+        "--proxy-user", "test.user",
+        "--master", "k8s://host:port",
+        "--class", "org.SomeClass",
+        "--conf", "spark.kubernetes.submitInDriver=true",
+        "--conf", "spark.kubernetes.jars.avoidDownloadSchemes=s3a",
+        "--conf", 
"spark.hadoop.fs.s3a.impl=org.apache.spark.deploy.TestFileSystem",
+        "--conf", "spark.hadoop.fs.s3a.impl.disable.cache=true",
+        "--files", "src/test/resources/test_metrics_config.properties",
+        "--py-files", "src/test/resources/test_metrics_system.properties",
+        "--archives", "src/test/resources/log4j2.properties",
+        "--jars", s"src/test/resources/TestUDTF.jar,$remoteJarFile",
+        "/home/jarToIgnore.jar",
+        "arg1")
+      val appArgs = new SparkSubmitArguments(clArgs)
+      val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, 
Some(hadoopConf))
+      conf.get("spark.master") should be("k8s://https://host:port";)
+      conf.get("spark.jars").contains(remoteJarFile) shouldBe true
+      conf.get("spark.jars").contains("TestUDTF") shouldBe true
+
+      Files.exists(Paths.get("test_metrics_config.properties")) should be(true)
+      Files.exists(Paths.get("test_metrics_system.properties")) should be(true)
+      Files.exists(Paths.get("log4j2.properties")) should be(true)
+      Files.exists(Paths.get("TestUDTF.jar")) should be(true)
+      Files.exists(Paths.get(notToDownload.getName)) should be(false)
+      Files.delete(Paths.get("test_metrics_config.properties"))
+      Files.delete(Paths.get("test_metrics_system.properties"))
+      Files.delete(Paths.get("log4j2.properties"))
+      Files.delete(Paths.get("TestUDTF.jar"))
+    }
+  }
+
   test("SPARK-43014: Set `spark.app.submitTime` if missing ") {
     val clArgs1 = Seq(
       "--deploy-mode", "client",
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 01e9d6382c18..778af5f0751a 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -684,6 +684,18 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.jars.avoidDownloadSchemes</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Comma-separated list of schemes for which jars will NOT be downloaded to 
the 
+    driver local disk prior to be distributed to executors, only for 
kubernetes deployment. 
+    For use in cases when the jars are big and executor counts are high, 
+    concurrent download causes network saturation and timeouts. 
+    Wildcard '*' is denoted to not downloading jars for any the schemes.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.kubernetes.authenticate.submission.caCertFile</code></td>
   <td>(none)</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to