This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dd418e3b3502 [SPARK-52334][CORE][K8S] update all files, jars, and
pyFiles to reference the working directory after they are downloaded
dd418e3b3502 is described below
commit dd418e3b35026f09e75d0a30f99c8e27b02aee0a
Author: TongWei1105 <[email protected]>
AuthorDate: Fri Dec 5 09:25:25 2025 -0800
[SPARK-52334][CORE][K8S] update all files, jars, and pyFiles to reference
the working directory after they are downloaded
### What changes were proposed in this pull request?
This PR fixes a bug where submitting a Spark job using the --files option
and also calling SparkContext.addFile() for a file with the same name causes
Spark to throw an exception
`Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: File a.text was already registered with a different path (old path =
/tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path =
/opt/spark/work-dir/a.text`
### Why are the changes needed?
1. Submit a Spark application using spark-submit with the --files option:
`bin/spark-submit --files s3://bucket/a.text --class testDemo app.jar `
2. In the testDemo application code, call:
`sc.addFile("a.text", true)`
This works correctly in YARN mode, but throws an error in Kubernetes mode.
After [SPARK-33782](https://issues.apache.org/jira/browse/SPARK-33782), in
Kubernetes mode, --files, --jars, --archiveFiles, and --pyFiles are all
downloaded to the working directory.
However, in the code, args.files = filesLocalFiles, and filesLocalFiles
refers to a temporary download path, not the working directory.
This causes issues when user code like testDemo calls sc.addFile("a.text",
true), resulting in an error such as:
`Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: File a.text was already registered with a different path (old path =
/tmp/spark-6aa5129d-5bbb-464a-9e50-5b6ffe364ffb/a.text, new path =
/opt/spark/work-dir/a.text`
### Does this PR introduce _any_ user-facing change?
This issue can be resolved after this PR.
### How was this patch tested?
Existed UT
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #51037 from TongWei1105/SPARK-52334.
Authored-by: TongWei1105 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/SparkSubmit.scala | 6 ++--
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 42 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index c3215b16f25e..6872c7c3bd71 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -448,14 +448,16 @@ private[spark] class SparkSubmit extends Logging {
log" from ${MDC(LogKeys.SOURCE_PATH, source)}" +
log" to ${MDC(LogKeys.DESTINATION_PATH, dest)}")
Utils.deleteRecursively(dest)
- if (isArchive) {
+ val resourceUri = if (isArchive) {
Utils.unpack(source, dest)
+ localResources
} else {
Files.copy(source.toPath, dest.toPath)
+ dest.toURI
}
// Keep the URIs of local files with the given fragments.
Utils.getUriBuilder(
-
localResources).fragment(resolvedUri.getFragment).build().toString
+ resourceUri).fragment(resolvedUri.getFragment).build().toString
} ++ avoidDownloads.map(_.toString)).mkString(",")
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 0a44045742ff..18d3c35ea94f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1845,6 +1845,48 @@ class SparkSubmitSuite
assert(classpath.contains("."))
}
+ test("SPARK-52334: Update all files, jars, and pyFiles to" +
+ "reference the working directory after they are downloaded") {
+ withTempDir { dir =>
+ val text1 = File.createTempFile("test1_", ".txt", dir)
+ val zipFile1 = File.createTempFile("test1_", ".zip", dir)
+ TestUtils.createJar(Seq(text1), zipFile1)
+ val testFile = "test_metrics_config.properties"
+ val testPyFile = "test_metrics_system.properties"
+ val testJar = "TestUDTF.jar"
+ val clArgs = Seq(
+ "--deploy-mode", "client",
+ "--proxy-user", "test.user",
+ "--master", "k8s://host:port",
+ "--executor-memory", "5g",
+ "--class", "org.SomeClass",
+ "--driver-memory", "4g",
+ "--conf", "spark.kubernetes.namespace=spark",
+ "--conf", "spark.kubernetes.driver.container.image=bar",
+ "--conf", "spark.kubernetes.submitInDriver=true",
+ "--files", s"src/test/resources/$testFile",
+ "--py-files", s"src/test/resources/$testPyFile",
+ "--jars", s"src/test/resources/$testJar",
+ "--archives", s"${zipFile1.getAbsolutePath}#test_archives",
+ "/home/thejar.jar",
+ "arg1")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ val _ = submit.prepareSubmitEnvironment(appArgs)
+
+ appArgs.files should be (Utils.resolveURIs(s"$testFile,$testPyFile"))
+ appArgs.pyFiles should be (Utils.resolveURIs(testPyFile))
+ appArgs.jars should be (Utils.resolveURIs(testJar))
+ appArgs.archives should be
(Utils.resolveURIs(s"${zipFile1.getAbsolutePath}#test_archives"))
+
+ Files.isDirectory(Paths.get("test_archives")) should be(true)
+ Files.delete(Paths.get(testFile))
+ Files.delete(Paths.get(testPyFile))
+ Files.delete(Paths.get(testJar))
+ Files.delete(Paths.get(s"test_archives/${text1.getName}"))
+ Files.delete(Paths.get("test_archives/META-INF/MANIFEST.MF"))
+ }
+ }
+
// Requires Python dependencies for Spark Connect. Should be enabled by
default.
ignore("Spark Connect application submission (Python)") {
val pyFile = File.createTempFile("remote_test", ".py")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]