Repository: spark
Updated Branches:
  refs/heads/master 83061be69 -> dcfaeadea


[SPARK-15003] Use ConcurrentHashMap in place of HashMap for 
NewAccumulator.originals

## What changes were proposed in this pull request?

This PR proposes to use ConcurrentHashMap in place of HashMap for 
NewAccumulator.originals

This should result in better performance.

## How was this patch tested?

Existing unit test suite

cloud-fan

Author: tedyu <[email protected]>

Closes #12776 from tedyu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dcfaeade
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dcfaeade
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dcfaeade

Branch: refs/heads/master
Commit: dcfaeadea7e0013af98de626dec36306325f73e7
Parents: 83061be
Author: tedyu <[email protected]>
Authored: Sat Apr 30 07:54:53 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Sat Apr 30 07:54:53 2016 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/NewAccumulator.scala | 20 ++++++++------------
 .../apache/spark/InternalAccumulatorSuite.scala |  3 ++-
 2 files changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dcfaeade/core/src/main/scala/org/apache/spark/NewAccumulator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/NewAccumulator.scala 
b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
index aa21ccc..1571e15 100644
--- a/core/src/main/scala/org/apache/spark/NewAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/NewAccumulator.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.{lang => jl}
 import java.io.ObjectInputStream
+import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 import javax.annotation.concurrent.GuardedBy
 
@@ -198,8 +199,7 @@ private[spark] object AccumulatorContext {
    * once the RDDs and user-code that reference them are cleaned up.
    * TODO: Don't use a global map; these should be tied to a SparkContext 
(SPARK-13051).
    */
-  @GuardedBy("AccumulatorContext")
-  private val originals = new java.util.HashMap[Long, 
jl.ref.WeakReference[NewAccumulator[_, _]]]
+  private val originals = new ConcurrentHashMap[Long, 
jl.ref.WeakReference[NewAccumulator[_, _]]]
 
   private[this] val nextId = new AtomicLong(0L)
 
@@ -209,9 +209,7 @@ private[spark] object AccumulatorContext {
    */
   def newId(): Long = nextId.getAndIncrement
 
-  def numAccums: Int = synchronized(originals.size)
-
-  def accumIds: Set[Long] = synchronized(originals.keySet().asScala.toSet)
+  def numAccums: Int = originals.size
 
   /**
    * Register an [[Accumulator]] created on the driver such that it can be 
used on the executors.
@@ -224,23 +222,21 @@ private[spark] object AccumulatorContext {
    * If an [[Accumulator]] with the same ID was already registered, this does 
nothing instead
    * of overwriting it. We will never register same accumulator twice, this is 
just a sanity check.
    */
-  def register(a: NewAccumulator[_, _]): Unit = synchronized {
-    if (!originals.containsKey(a.id)) {
-      originals.put(a.id, new jl.ref.WeakReference[NewAccumulator[_, _]](a))
-    }
+  def register(a: NewAccumulator[_, _]): Unit = {
+    originals.putIfAbsent(a.id, new jl.ref.WeakReference[NewAccumulator[_, 
_]](a))
   }
 
   /**
    * Unregister the [[Accumulator]] with the given ID, if any.
    */
-  def remove(id: Long): Unit = synchronized {
+  def remove(id: Long): Unit = {
     originals.remove(id)
   }
 
   /**
    * Return the [[Accumulator]] registered with the given ID, if any.
    */
-  def get(id: Long): Option[NewAccumulator[_, _]] = synchronized {
+  def get(id: Long): Option[NewAccumulator[_, _]] = {
     Option(originals.get(id)).map { ref =>
       // Since we are storing weak references, we must check whether the 
underlying data is valid.
       val acc = ref.get
@@ -254,7 +250,7 @@ private[spark] object AccumulatorContext {
   /**
    * Clear all registered [[Accumulator]]s. For testing only.
    */
-  def clear(): Unit = synchronized {
+  def clear(): Unit = {
     originals.clear()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dcfaeade/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index 972e31c..688eb6b 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -194,7 +194,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with 
LocalSparkContext {
     }
     // Make sure the same set of accumulators is registered for cleanup
     assert(accumsRegistered.size === numInternalAccums * 2)
-    assert(accumsRegistered.toSet === AccumulatorContext.accumIds)
+    assert(accumsRegistered.toSet.size === AccumulatorContext.numAccums)
+    accumsRegistered.foreach(id => assert(AccumulatorContext.get(id) != None))
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to