This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new a837105631ec [SPARK-45227][CORE] Fix a subtle thread-safety issue with
CoarseGrainedExecutorBackend
a837105631ec is described below
commit a837105631ec581fc485ed97e13f0dedf446f2bd
Author: Bo Xiong <[email protected]>
AuthorDate: Sat Sep 30 12:04:34 2023 -0500
[SPARK-45227][CORE] Fix a subtle thread-safety issue with
CoarseGrainedExecutorBackend
Backport of #43021 to branch 3.3
### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an
executor process randomly gets stuck.
### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite
loop" (as explained in the SPARK-45227). Once an executor process runs into a
state, it'd stop launching tasks from the driver or reporting task status back.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none
-DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite
test
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43176 from xiongbo-sjtu/branch-3.3.
Authored-by: Bo Xiong <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 11 +++++++----
.../spark/executor/CoarseGrainedExecutorBackendSuite.scala | 6 +++---
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4903421f9063..8d89baf54a23 100644
---
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -20,9 +20,9 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
@@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend(
/**
* Map each taskId to the information about the resource allocated to it,
Please refer to
* [[ResourceInformation]] for specifics.
+ * CHM is used to ensure thread-safety
(https://issues.apache.org/jira/browse/SPARK-45227)
* Exposed for testing only.
*/
- private[executor] val taskResources = new mutable.HashMap[Long, Map[String,
ResourceInformation]]
+ private[executor] val taskResources = new ConcurrentHashMap[
+ Long, Map[String, ResourceInformation]
+ ]
private var decommissioned = false
@@ -184,7 +187,7 @@ private[spark] class CoarseGrainedExecutorBackend(
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
- taskResources(taskDesc.taskId) = taskDesc.resources
+ taskResources.put(taskDesc.taskId, taskDesc.resources)
executor.launchTask(this, taskDesc)
}
@@ -261,7 +264,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer):
Unit = {
- val resources = taskResources.getOrElse(taskId, Map.empty[String,
ResourceInformation])
+ val resources = taskResources.getOrDefault(taskId, Map.empty[String,
ResourceInformation])
val msg = StatusUpdate(executorId, taskId, state, data, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
diff --git
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index a12b7034a6df..c6cd7560c8c7 100644
---
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -300,7 +300,7 @@ class CoarseGrainedExecutorBackendSuite extends
SparkFunSuite
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
assert(backend.taskResources.isEmpty)
- val taskId = 1000000
+ val taskId = 1000000L
// We don't really verify the data, just pass it around.
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK
1000000", 19,
@@ -314,14 +314,14 @@ class CoarseGrainedExecutorBackendSuite extends
SparkFunSuite
backend.self.send(LaunchTask(new
SerializableBuffer(serializedTaskDescription)))
eventually(timeout(10.seconds)) {
assert(backend.taskResources.size == 1)
- val resources = backend.taskResources(taskId)
+ val resources = backend.taskResources.get(taskId)
assert(resources(GPU).addresses sameElements Array("0", "1"))
}
// Update the status of a running task shall not affect `taskResources`
map.
backend.statusUpdate(taskId, TaskState.RUNNING, data)
assert(backend.taskResources.size == 1)
- val resources = backend.taskResources(taskId)
+ val resources = backend.taskResources.get(taskId)
assert(resources(GPU).addresses sameElements Array("0", "1"))
// Update the status of a finished task shall remove the entry from
`taskResources` map.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]