This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new b831d826ba8 [SPARK-44476][CORE][CONNECT] Fix population of artifacts 
for a JobArtifactState with no associated artifacts
b831d826ba8 is described below

commit b831d826ba8443ca6195704218045dceae632d97
Author: vicennial <[email protected]>
AuthorDate: Wed Jul 19 09:36:51 2023 +0900

    [SPARK-44476][CORE][CONNECT] Fix population of artifacts for a 
JobArtifactState with no associated artifacts
    
    ### What changes were proposed in this pull request?
    
    When a `JobArtifactSet` is created form a specific `JobArtifactState`, an 
empty collection of files/jars/archives is returned if there have previously 
been no associated artifacts of that specific type rather than all 
files/jars/archives.
    
    ### Why are the changes needed?
    
    Consider each artifact type - files/jars/archives. For each artifact type, 
the following bug exists:
    
    1. Initialise a `JobArtifactState` with no artifacts added to it.
    2. Create a  `JobArtifactSet` from the `JobArtifactState`.
    3. Add an artifact with the same active `JobArtifactState`.
    4. Create another `JobArtifactSet`
    
    In the current behaviour, the set created in step 2 contains all existing 
artifacts of that type (through `sc.allAddedFiles` for example) while step 4 
would only contain the single artifact added in step 3.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. (Bug-fix addresses the previously incorrect user behaviour)
    
    ### How was this patch tested?
    
    New unit test in `JobArtifactSetSuite`.
    
    Closes #42062 from vicennial/SPARK-44476.
    
    Authored-by: vicennial <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit 98de9a0ad46086bfad5d89a66540a24e39d2f029)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../scala/org/apache/spark/JobArtifactSet.scala    |  6 ++--
 .../org/apache/spark/JobArtifactSetSuite.scala     | 36 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/JobArtifactSet.scala 
b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
index 54922f5783a..c304e15f358 100644
--- a/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
+++ b/core/src/main/scala/org/apache/spark/JobArtifactSet.scala
@@ -110,13 +110,13 @@ private[spark] object JobArtifactSet {
     new JobArtifactSet(
       state = maybeState,
       jars = maybeState
-        .map(s => sc.addedJars.getOrElse(s.uuid, sc.allAddedJars))
+        .map(s => sc.addedJars.getOrElse(s.uuid, Map.empty[String, Long]))
         .getOrElse(sc.allAddedJars).toMap,
       files = maybeState
-        .map(s => sc.addedFiles.getOrElse(s.uuid, sc.allAddedFiles))
+        .map(s => sc.addedFiles.getOrElse(s.uuid, Map.empty[String, Long]))
         .getOrElse(sc.allAddedFiles).toMap,
       archives = maybeState
-        .map(s => sc.addedArchives.getOrElse(s.uuid, sc.allAddedArchives))
+        .map(s => sc.addedArchives.getOrElse(s.uuid, Map.empty[String, Long]))
         .getOrElse(sc.allAddedArchives).toMap)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala 
b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
index 66d02e8b511..bf1cb4dded8 100644
--- a/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobArtifactSetSuite.scala
@@ -86,4 +86,40 @@ class JobArtifactSetSuite extends SparkFunSuite with 
LocalSparkContext {
       assert(JobArtifactSet.getActiveOrDefault(sc).state.isEmpty)
     }
   }
+
+  test("SPARK-44476: JobArtifactState is not populated with all artifacts if 
none are " +
+    "explicitly added to it.") {
+    withTempDir { dir =>
+      val conf = new SparkConf()
+        .setAppName("test")
+        .setMaster("local")
+        .set("spark.repl.class.uri", "dummyUri")
+      sc = new SparkContext(conf)
+
+      val jarPath = File.createTempFile("testJar", ".jar", dir).getAbsolutePath
+      val filePath = File.createTempFile("testFile", ".txt", 
dir).getAbsolutePath
+      val fileToZip = File.createTempFile("testFile", "", dir).getAbsolutePath
+      val archivePath = s"$fileToZip.zip"
+      createZipFile(fileToZip, archivePath)
+
+      val otherJobArtifactState = JobArtifactState("other", Some("state"))
+
+      JobArtifactSet.withActiveJobArtifactState(otherJobArtifactState) {
+        sc.addJar(jarPath)
+        sc.addFile(filePath)
+        sc.addArchive(archivePath)
+      }
+
+      val artifactState = JobArtifactState("abc", Some("xyz"))
+      JobArtifactSet.withActiveJobArtifactState(artifactState) {
+        val jobArtifactSet = JobArtifactSet.getActiveOrDefault(sc)
+
+        // Artifacts from the other state must be not visible to this state.
+        assert(jobArtifactSet.state.contains(artifactState))
+        assert(jobArtifactSet.jars.isEmpty)
+        assert(jobArtifactSet.files.isEmpty)
+        assert(jobArtifactSet.archives.isEmpty)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to