This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new dd825e8  [SPARK-34731][CORE] Avoid ConcurrentModificationException 
when redacting properties in EventLoggingListener
dd825e8 is described below

commit dd825e82bb90c015b204c2e2e5eb3664450a9121
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Thu Mar 18 14:59:57 2021 +0900

    [SPARK-34731][CORE] Avoid ConcurrentModificationException when redacting 
properties in EventLoggingListener
    
    ### What changes were proposed in this pull request?
    
    Change DAGScheduler to pass a clone of the Properties object, rather than 
the original object, to the SparkListenerJobStart event.
    
    ### Why are the changes needed?
    
     DAGScheduler might modify the Properties object (e.g., in 
addPySparkConfigsToProperties) after firing off the SparkListenerJobStart 
event. Since the handler for that event (onJobStart in EventLoggingListener) 
will iterate over the elements of the Property object, this sometimes results 
in a ConcurrentModificationException.
    
    This can be demonstrated using these steps:
    ```
    $ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
    --conf spark.executor.cores=1 --driver-memory 4g --conf \
    "spark.ui.showConsoleProgress=false" \
    --conf spark.eventLog.enabled=true \
    --conf spark.eventLog.dir=/tmp/spark-events
    ...
    scala> (0 to 500).foreach { i =>
         |   val df = spark.range(0, 20000).toDF("a")
         |   df.filter("a > 12").count
         | }
    21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener 
threw an exception
    java.util.ConcurrentModificationException
        at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
    ```
    
    I've not actually seen a ConcurrentModificationException in 
onStageSubmitted, only in onJobStart. However, they both iterate over the 
Properties object, so for safety's sake I pass a clone to 
SparkListenerStageSubmitted as well.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By repeatedly running the reproduction steps from above.
    
    Closes #31826 from bersprockets/elconcurrent.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit f8a8b340b3a69fd10514af03085d77027b84e617)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 17 +++++++++++------
 core/src/main/scala/org/apache/spark/util/Utils.scala   |  3 +++
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f333cee..549f627 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -890,10 +890,11 @@ private[spark] class DAGScheduler(
       timeout: Long,
       properties: Properties): PartialResult[R] = {
     val jobId = nextJobId.getAndIncrement()
+    val clonedProperties = Utils.cloneProperties(properties)
     if (rdd.partitions.isEmpty) {
       // Return immediately if the job is running 0 tasks
       val time = clock.getTimeMillis()
-      listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), 
properties))
+      listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), 
clonedProperties))
       listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded))
       return new PartialResult(evaluator.currentResult(), true)
     }
@@ -901,7 +902,7 @@ private[spark] class DAGScheduler(
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     eventProcessLoop.post(JobSubmitted(
       jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
-      Utils.cloneProperties(properties)))
+      clonedProperties))
     listener.awaitResult()    // Will throw an exception if the job fails
   }
 
@@ -1162,7 +1163,8 @@ private[spark] class DAGScheduler(
     val stageIds = jobIdToStageIds(jobId).toArray
     val stageInfos = stageIds.flatMap(id => 
stageIdToStage.get(id).map(_.latestInfo))
     listenerBus.post(
-      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, 
properties))
+      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
+        Utils.cloneProperties(properties)))
     submitStage(finalStage)
   }
 
@@ -1200,7 +1202,8 @@ private[spark] class DAGScheduler(
     val stageIds = jobIdToStageIds(jobId).toArray
     val stageInfos = stageIds.flatMap(id => 
stageIdToStage.get(id).map(_.latestInfo))
     listenerBus.post(
-      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, 
properties))
+      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
+        Utils.cloneProperties(properties)))
     submitStage(finalStage)
 
     // If the whole stage has already finished, tell the listener and remove it
@@ -1333,7 +1336,8 @@ private[spark] class DAGScheduler(
     } catch {
       case NonFatal(e) =>
         stage.makeNewStageAttempt(partitionsToCompute.size)
-        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, 
properties))
+        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
+          Utils.cloneProperties(properties)))
         abortStage(stage, s"Task creation failed: 
$e\n${Utils.exceptionString(e)}", Some(e))
         runningStages -= stage
         return
@@ -1347,7 +1351,8 @@ private[spark] class DAGScheduler(
     if (partitionsToCompute.nonEmpty) {
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     }
-    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
+    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
+      Utils.cloneProperties(properties)))
 
     // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it 
multiple times.
     // Broadcasted binary for the task, used to dispatch tasks to executors. 
Note that we broadcast
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0888cbd..080d3bb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -3000,6 +3000,9 @@ private[spark] object Utils extends Logging {
 
   /** Create a new properties object with the same values as `props` */
   def cloneProperties(props: Properties): Properties = {
+    if (props == null) {
+      return props
+    }
     val resultProps = new Properties()
     props.forEach((k, v) => resultProps.put(k, v))
     resultProps


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to