Repository: spark
Updated Branches:
  refs/heads/master 1ffa608ba -> 7920296bf


[SPARK-15430][SQL] Fix potential ConcurrentModificationException for 
ListAccumulator

## What changes were proposed in this pull request?

In `ListAccumulator` we create an unmodifiable view for underlying list. 
However, it doesn't prevent the underlying to be modified further. So as we 
access the unmodifiable list, the underlying list can be modified in the same 
time. It could cause `java.util.ConcurrentModificationException`. We can 
observe such exception in recent tests.

To fix it, we can copy a list of the underlying list and then create the 
unmodifiable view of this list instead.

## How was this patch tested?
The exception might be difficult to test. Existing tests should be passed.

Author: Liang-Chi Hsieh <[email protected]>

Closes #13211 from viirya/fix-concurrentmodify.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7920296b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7920296b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7920296b

Branch: refs/heads/master
Commit: 7920296bf8f313e010205937d3ebcbbc7b1a1d9e
Parents: 1ffa608
Author: Liang-Chi Hsieh <[email protected]>
Authored: Sun May 22 08:08:46 2016 -0500
Committer: Sean Owen <[email protected]>
Committed: Sun May 22 08:08:46 2016 -0500

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7920296b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 13cb6a2..21ba460 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
 
 import java.{lang => jl}
 import java.io.ObjectInputStream
+import java.util.ArrayList
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
@@ -415,7 +416,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, 
jl.Double] {
 
 
 class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
-  private val _list: java.util.List[T] = new java.util.ArrayList[T]
+  private val _list: java.util.List[T] = new ArrayList[T]
 
   override def isZero: Boolean = _list.isEmpty
 
@@ -437,7 +438,9 @@ class ListAccumulator[T] extends AccumulatorV2[T, 
java.util.List[T]] {
       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
   }
 
-  override def value: java.util.List[T] = 
java.util.Collections.unmodifiableList(_list)
+  override def value: java.util.List[T] = _list.synchronized {
+    java.util.Collections.unmodifiableList(new ArrayList[T](_list))
+  }
 
   private[spark] def setValue(newValue: java.util.List[T]): Unit = {
     _list.clear()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to