This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new a837105631ec [SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend
a837105631ec is described below

commit a837105631ec581fc485ed97e13f0dedf446f2bd
Author: Bo Xiong <[email protected]>
AuthorDate: Sat Sep 30 12:04:34 2023 -0500

    [SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend
    
    Backport of #43021 to branch 3.3
    
    ### What changes were proposed in this pull request?
    Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an 
executor process randomly gets stuck.
    
    ### Why are the changes needed?
    For each executor, the single-threaded dispatcher can run into an "infinite 
loop" (as explained in the SPARK-45227). Once an executor process runs into a 
state, it'd stop launching tasks from the driver or reporting task status back.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    ```
    $ build/mvn package -DskipTests -pl core
    $ build/mvn -Dtest=none 
-DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite 
test
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43176 from xiongbo-sjtu/branch-3.3.
    
    Authored-by: Bo Xiong <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../apache/spark/executor/CoarseGrainedExecutorBackend.scala  | 11 +++++++----
 .../spark/executor/CoarseGrainedExecutorBackendSuite.scala    |  6 +++---
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4903421f9063..8d89baf54a23 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -20,9 +20,9 @@ package org.apache.spark.executor
 import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicBoolean
 
-import scala.collection.mutable
 import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
@@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend(
   /**
    * Map each taskId to the information about the resource allocated to it, 
Please refer to
    * [[ResourceInformation]] for specifics.
+   * CHM is used to ensure thread-safety 
(https://issues.apache.org/jira/browse/SPARK-45227)
    * Exposed for testing only.
    */
-  private[executor] val taskResources = new mutable.HashMap[Long, Map[String, 
ResourceInformation]]
+  private[executor] val taskResources = new ConcurrentHashMap[
+    Long, Map[String, ResourceInformation]
+  ]
 
   private var decommissioned = false
 
@@ -184,7 +187,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       } else {
         val taskDesc = TaskDescription.decode(data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
-        taskResources(taskDesc.taskId) = taskDesc.resources
+        taskResources.put(taskDesc.taskId, taskDesc.resources)
         executor.launchTask(this, taskDesc)
       }
 
@@ -261,7 +264,7 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): 
Unit = {
-    val resources = taskResources.getOrElse(taskId, Map.empty[String, 
ResourceInformation])
+    val resources = taskResources.getOrDefault(taskId, Map.empty[String, 
ResourceInformation])
     val msg = StatusUpdate(executorId, taskId, state, data, resources)
     if (TaskState.isFinished(state)) {
       taskResources.remove(taskId)
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index a12b7034a6df..c6cd7560c8c7 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -300,7 +300,7 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
           resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
       assert(backend.taskResources.isEmpty)
 
-      val taskId = 1000000
+      val taskId = 1000000L
       // We don't really verify the data, just pass it around.
       val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
       val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 
1000000", 19,
@@ -314,14 +314,14 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       backend.self.send(LaunchTask(new 
SerializableBuffer(serializedTaskDescription)))
       eventually(timeout(10.seconds)) {
         assert(backend.taskResources.size == 1)
-        val resources = backend.taskResources(taskId)
+        val resources = backend.taskResources.get(taskId)
         assert(resources(GPU).addresses sameElements Array("0", "1"))
       }
 
       // Update the status of a running task shall not affect `taskResources` 
map.
       backend.statusUpdate(taskId, TaskState.RUNNING, data)
       assert(backend.taskResources.size == 1)
-      val resources = backend.taskResources(taskId)
+      val resources = backend.taskResources.get(taskId)
       assert(resources(GPU).addresses sameElements Array("0", "1"))
 
       // Update the status of a finished task shall remove the entry from 
`taskResources` map.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to