This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new b2ff4c4f7ec [SPARK-39696][CORE] Fix data race in access to
TaskMetrics.externalAccums
b2ff4c4f7ec is described below
commit b2ff4c4f7ec21d41cb173b413bd5aa5feefd7eee
Author: Emil Ejbyfeldt <[email protected]>
AuthorDate: Fri Apr 7 10:14:07 2023 +0900
[SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums
### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to
`TaskMetrics.externalAccums`. The race occurs between the
`executor-heartbeater` thread and the thread executing the task. This data race
is not known to cause issues on 2.12 but in 2.13 ~due this change
https://github.com/scala/scala/pull/9258~ (LuciferYang bisected this to first
cause failures in scala 2.13.7 one possible reason could be
https://github.com/scala/scala/pull/9786) leads to an uncaught exception in the
`executor-hea [...]
This fix of using of using `CopyOnWriteArrayList` is cherry picked from
https://github.com/apache/spark/pull/37206 where is was suggested as a fix by
LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside
the class `TaskMetrics`. The old PR was closed because at that point there was
no clear understanding of the race condition. JoshRosen commented here
https://github.com/apache/spark/pull/37206#issuecomment-1189930626 saying that
there should be no such race base [...]
### Why are the changes needed?
The current code has a data race.
### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when
using scala 2.13.
### How was this patch tested?
This patch adds a new test case, that before the fix was applied
consistently produces the uncaught exception in the heartbeater thread when
using scala 2.13.
Closes #40663 from eejbyfeldt/SPARK-39696.
Lead-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 6ce0822f76e11447487d5f6b3cce94a894f2ceef)
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/executor/TaskMetrics.scala | 10 +++++++---
.../org/apache/spark/executor/ExecutorSuite.scala | 22 ++++++++++++++++++++++
2 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 1ca8590b1c9..78b39b0cbda 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,6 +17,8 @@
package org.apache.spark.executor
+import java.util.concurrent.CopyOnWriteArrayList
+
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
@@ -262,10 +264,12 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* External accumulators registered with this task.
*/
- @transient private[spark] lazy val externalAccums = new
ArrayBuffer[AccumulatorV2[_, _]]
+ @transient private[spark] lazy val _externalAccums = new
CopyOnWriteArrayList[AccumulatorV2[_, _]]
+
+ private[spark] def externalAccums = _externalAccums.asScala
private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
- externalAccums += a
+ _externalAccums.add(a)
}
private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums
++ externalAccums
@@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging {
tmAcc.metadata = acc.metadata
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
} else {
- tm.externalAccums += acc
+ tm._externalAccums.add(acc)
}
}
tm
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index bef36d08e8a..46f41195ebd 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -30,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Map}
import scala.concurrent.duration._
import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.logging.log4j._
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{inOrder, verify, when}
@@ -270,6 +271,27 @@ class ExecutorSuite extends SparkFunSuite
heartbeatZeroAccumulatorUpdateTest(false)
}
+ test("SPARK-39696: Using accumulators should not cause heartbeat to fail") {
+ val conf = new SparkConf().setMaster("local").setAppName("executor suite
test")
+ conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms")
+ sc = new SparkContext(conf)
+
+ val accums = (1 to 10).map(i =>
sc.longAccumulator(s"mapperRunAccumulator$i"))
+ val input = sc.parallelize(1 to 10, 10)
+ var testRdd = input.map(i => (i, i))
+ (0 to 10).foreach( i =>
+ testRdd = testRdd.map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2)
}).reduceByKey(_ + _)
+ )
+
+ val logAppender = new LogAppender("heartbeat thread should not die")
+ withLogAppender(logAppender, level = Some(Level.ERROR)) {
+ val _ = testRdd.count()
+ }
+ val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
+ .filter(_.contains("Uncaught exception in thread executor-heartbeater"))
+ assert(logs.isEmpty)
+ }
+
private def withMockHeartbeatReceiverRef(executor: Executor)
(func: RpcEndpointRef => Unit): Unit = {
val executorClass = classOf[Executor]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]