This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new b831d826ba8 [SPARK-44476][CORE][CONNECT] Fix population of artifacts
for a JobArtifactState with no associated artifacts
b831d826ba8 is described below
commit b831d826ba8443ca6195704218045dceae632d97
Author: vicennial <[email protected]>
AuthorDate: Wed Jul 19 09:36:51 2023 +0900
[SPARK-44476][CORE][CONNECT] Fix population of artifacts for a
JobArtifactState with no associated artifacts
### What changes were proposed in this pull request?
When a `JobArtifactSet` is created form a specific `JobArtifactState`, an
empty collection of files/jars/archives is returned if there have previously
been no associated artifacts of that specific type rather than all
files/jars/archives.
### Why are the changes needed?
Consider each artifact type - files/jars/archives. For each artifact type,
the following bug exists:
1. Initialise a `JobArtifactState` with no artifacts added to it.
2. Create a `JobArtifactSet` from the `JobArtifactState`.
3. Add an artifact with the same active `JobArtifactState`.
4. Create another `JobArtifactSet`
In the current behaviour, the set created in step 2 contains all existing
artifacts of that type (through `sc.allAddedFiles` for example) while step 4
would only contain the single artifact added in step 3.
### Does this PR introduce _any_ user-facing change?
No. (Bug-fix addresses the previously incorrect user behaviour)
### How was this patch tested?
New unit test in `JobArtifactSetSuite`.
Closes #42062 from vicennial/SPARK-44476.
Authored-by: vicennial <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 98de9a0ad46086bfad5d89a66540a24e39d2f029)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/JobArtifactSet.scala | 6 ++--
.../org/apache/spark/JobArtifactSetSuite.scala | 36 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
index 54922f5783a..c304e15f358 100644
--- a/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
+++ b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
@@ -110,13 +110,13 @@ private[spark] object JobArtifactSet {
new JobArtifactSet(
state = maybeState,
jars = maybeState
- .map(s => sc.addedJars.getOrElse(s.uuid, sc.allAddedJars))
+ .map(s => sc.addedJars.getOrElse(s.uuid, Map.empty[String, Long]))
.getOrElse(sc.allAddedJars).toMap,
files = maybeState
- .map(s => sc.addedFiles.getOrElse(s.uuid, sc.allAddedFiles))
+ .map(s => sc.addedFiles.getOrElse(s.uuid, Map.empty[String, Long]))
.getOrElse(sc.allAddedFiles).toMap,
archives = maybeState
- .map(s => sc.addedArchives.getOrElse(s.uuid, sc.allAddedArchives))
+ .map(s => sc.addedArchives.getOrElse(s.uuid, Map.empty[String, Long]))
.getOrElse(sc.allAddedArchives).toMap)
}
}
diff --git a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
index 66d02e8b511..bf1cb4dded8 100644
--- a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
@@ -86,4 +86,40 @@ class JobArtifactSetSuite extends SparkFunSuite with
LocalSparkContext {
assert(JobArtifactSet.getActiveOrDefault(sc).state.isEmpty)
}
}
+
+ test("SPARK-44476: JobArtifactState is not populated with all artifacts if
none are " +
+ "explicitly added to it.") {
+ withTempDir { dir =>
+ val conf = new SparkConf()
+ .setAppName("test")
+ .setMaster("local")
+ .set("spark.repl.class.uri", "dummyUri")
+ sc = new SparkContext(conf)
+
+ val jarPath = File.createTempFile("testJar", ".jar", dir).getAbsolutePath
+ val filePath = File.createTempFile("testFile", ".txt",
dir).getAbsolutePath
+ val fileToZip = File.createTempFile("testFile", "", dir).getAbsolutePath
+ val archivePath = s"$fileToZip.zip"
+ createZipFile(fileToZip, archivePath)
+
+ val otherJobArtifactState = JobArtifactState("other", Some("state"))
+
+ JobArtifactSet.withActiveJobArtifactState(otherJobArtifactState) {
+ sc.addJar(jarPath)
+ sc.addFile(filePath)
+ sc.addArchive(archivePath)
+ }
+
+ val artifactState = JobArtifactState("abc", Some("xyz"))
+ JobArtifactSet.withActiveJobArtifactState(artifactState) {
+ val jobArtifactSet = JobArtifactSet.getActiveOrDefault(sc)
+
+ // Artifacts from the other state must be not visible to this state.
+ assert(jobArtifactSet.state.contains(artifactState))
+ assert(jobArtifactSet.jars.isEmpty)
+ assert(jobArtifactSet.files.isEmpty)
+ assert(jobArtifactSet.archives.isEmpty)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]