This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dd418e3b3502 [SPARK-52334][CORE][K8S] update all files, jars, and 
pyFiles to reference the working directory after they are downloaded
dd418e3b3502 is described below

commit dd418e3b35026f09e75d0a30f99c8e27b02aee0a
Author: TongWei1105 <[email protected]>
AuthorDate: Fri Dec 5 09:25:25 2025 -0800

    [SPARK-52334][CORE][K8S] update all files, jars, and pyFiles to reference 
the working directory after they are downloaded
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a bug where submitting a Spark job using the --files option 
and also calling SparkContext.addFile() for a file with the same name causes 
Spark to throw an exception
    `Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: File a.text was already registered with a different path (old path = 
/tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = 
/opt/spark/work-dir/a.text`
    
    ### Why are the changes needed?
    
    1. Submit a Spark application using spark-submit with the --files option:
    `bin/spark-submit --files s3://bucket/a.text --class testDemo app.jar `
    2. In the testDemo application code, call:
    `sc.addFile("a.text", true)`
    
    This works correctly in YARN mode, but throws an error in Kubernetes mode.
    After [SPARK-33782](https://issues.apache.org/jira/browse/SPARK-33782), in 
Kubernetes mode, --files, --jars, --archiveFiles, and --pyFiles are all 
downloaded to the working directory.
    
    However, in the code, args.files = filesLocalFiles, and filesLocalFiles 
refers to a temporary download path, not the working directory.
    This causes issues when user code like testDemo calls sc.addFile("a.text", 
true), resulting in an error such as:
    `Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: File a.text was already registered with a different path (old path = 
/tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path = 
/opt/spark/work-dir/a.text`
    
    ### Does this PR introduce _any_ user-facing change?
    
    This issue can be resolved after this PR.
    
    ### How was this patch tested?
    
    Existed UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #51037 from TongWei1105/SPARK-52334.
    
    Authored-by: TongWei1105 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      |  6 ++--
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 42 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c3215b16f25e..6872c7c3bd71 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -448,14 +448,16 @@ private[spark] class SparkSubmit extends Logging {
                 log" from ${MDC(LogKeys.SOURCE_PATH, source)}" +
                 log" to ${MDC(LogKeys.DESTINATION_PATH, dest)}")
               Utils.deleteRecursively(dest)
-              if (isArchive) {
+              val resourceUri = if (isArchive) {
                 Utils.unpack(source, dest)
+                localResources
               } else {
                 Files.copy(source.toPath, dest.toPath)
+                dest.toURI
               }
               // Keep the URIs of local files with the given fragments.
               Utils.getUriBuilder(
-                
localResources).fragment(resolvedUri.getFragment).build().toString
+                resourceUri).fragment(resolvedUri.getFragment).build().toString
           } ++ avoidDownloads.map(_.toString)).mkString(",")
         }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 0a44045742ff..18d3c35ea94f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1845,6 +1845,48 @@ class SparkSubmitSuite
     assert(classpath.contains("."))
   }
 
+  test("SPARK-52334: Update all files, jars, and pyFiles to" +
+    "reference the working directory after they are downloaded") {
+    withTempDir { dir =>
+      val text1 = File.createTempFile("test1_", ".txt", dir)
+      val zipFile1 = File.createTempFile("test1_", ".zip", dir)
+      TestUtils.createJar(Seq(text1), zipFile1)
+      val testFile = "test_metrics_config.properties"
+      val testPyFile = "test_metrics_system.properties"
+      val testJar = "TestUDTF.jar"
+      val clArgs = Seq(
+        "--deploy-mode", "client",
+        "--proxy-user", "test.user",
+        "--master", "k8s://host:port",
+        "--executor-memory", "5g",
+        "--class", "org.SomeClass",
+        "--driver-memory", "4g",
+        "--conf", "spark.kubernetes.namespace=spark",
+        "--conf", "spark.kubernetes.driver.container.image=bar",
+        "--conf", "spark.kubernetes.submitInDriver=true",
+        "--files", s"src/test/resources/$testFile",
+        "--py-files", s"src/test/resources/$testPyFile",
+        "--jars", s"src/test/resources/$testJar",
+        "--archives", s"${zipFile1.getAbsolutePath}#test_archives",
+        "/home/thejar.jar",
+        "arg1")
+      val appArgs = new SparkSubmitArguments(clArgs)
+      val _ = submit.prepareSubmitEnvironment(appArgs)
+
+      appArgs.files should be (Utils.resolveURIs(s"$testFile,$testPyFile"))
+      appArgs.pyFiles should be (Utils.resolveURIs(testPyFile))
+      appArgs.jars should be (Utils.resolveURIs(testJar))
+      appArgs.archives should be 
(Utils.resolveURIs(s"${zipFile1.getAbsolutePath}#test_archives"))
+
+      Files.isDirectory(Paths.get("test_archives")) should be(true)
+      Files.delete(Paths.get(testFile))
+      Files.delete(Paths.get(testPyFile))
+      Files.delete(Paths.get(testJar))
+      Files.delete(Paths.get(s"test_archives/${text1.getName}"))
+      Files.delete(Paths.get("test_archives/META-INF/MANIFEST.MF"))
+    }
+  }
+
   // Requires Python dependencies for Spark Connect. Should be enabled by 
default.
   ignore("Spark Connect application submission (Python)") {
     val pyFile = File.createTempFile("remote_test", ".py")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to