Stephen Boesch created SPARK-2638: ------------------------------------- Summary: Improve concurrency of fetching Map outputs Key: SPARK-2638 URL: https://issues.apache.org/jira/browse/SPARK-2638 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 1.0.0 Environment: All Reporter: Stephen Boesch Priority: Minor Fix For: 1.1.0
This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing "fetching" collection - which makes ALL fetches wait if any fetch were occurring. The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility). def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") var fetchedStatuses: Array[MapStatus] = null fetching.synchronized { // This is existing code // shuffleId.toString.intern.synchronized { // New Code if (fetching.contains(shuffleId)) { // Someone else is fetching it; wait for them to be done while (fetching.contains(shuffleId)) { try { fetching.wait() } catch { case e: InterruptedException => } } This is only a small code change, but the testcases to prove (a) proper functionality and (b) proper performance improvement are not so trivial. For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added a git project that demonstrates the concurrency/performance improvement using the fine-grained approach . The github project is at https://github.com/javadba/scalatesting.git . Simply run "sbt test". Note: it is unclear how/where to include this ancillary testing/verification information that will not be included in the git PR: i am open for any suggestions - even as far as simply removing references to it. -- This message was sent by Atlassian JIRA (v6.2#6252)