Repository: spark Updated Branches: refs/heads/master 0dc868e78 -> 86bce7649
SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap MapOutputTrackerWorker.mapStatuses is used concurrently, it should be thread-safe. This bug has already been fixed in #1328. Nevertheless, considering #1328 won't be merged soon, I send this trivial fix and hope this issue can be solved soon. Author: zsxwing <[email protected]> Closes #1541 from zsxwing/SPARK-2634 and squashes the following commits: d450053 [zsxwing] SPARK-2634: Change MapOutputTrackerWorker.mapStatuses to ConcurrentHashMap Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86bce764 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86bce764 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86bce764 Branch: refs/heads/master Commit: 86bce764983f2b14e1bd87fc3f4f938f7a217e1b Parents: 0dc868e Author: zsxwing <[email protected]> Authored: Thu Sep 25 18:24:01 2014 -0700 Committer: Josh Rosen <[email protected]> Committed: Thu Sep 25 18:24:01 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/86bce764/core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 51705c8..f92189b 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -18,10 +18,12 @@ package org.apache.spark import java.io._ +import java.util.concurrent.ConcurrentHashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.{HashSet, HashMap, Map} import scala.concurrent.Await +import scala.collection.JavaConversions._ import akka.actor._ import akka.pattern.ask @@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * On the master, it serves as the source of map outputs recorded from ShuffleMapTasks. * On the workers, it simply serves as a cache, in which a miss triggers a fetch from the * master's corresponding HashMap. + * + * Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a + * thread-safe map. */ protected val mapStatuses: Map[Int, Array[MapStatus]] @@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * MapOutputTrackerMaster. */ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { - protected val mapStatuses = new HashMap[Int, Array[MapStatus]] + protected val mapStatuses: Map[Int, Array[MapStatus]] = + new ConcurrentHashMap[Int, Array[MapStatus]] } private[spark] object MapOutputTracker { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
