Repository: spark
Updated Branches:
  refs/heads/branch-1.0 99285d064 -> 2f24159c1


SPARK-1145: Memory mapping with many small blocks can cause JVM allocation 
failures

This includes some minor code clean-up as well. The main change is that small 
files are not memory mapped. There is a nicer way to write that code block 
using Scala's `Try` but to make it easy to back port and as simple as possible, 
I opted for the more explicit but less pretty format.

Author: Patrick Wendell <[email protected]>

Closes #43 from pwendell/block-iter-logging and squashes the following commits:

1cff512 [Patrick Wendell] Small issue from merge.
49f6c269 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into 
block-iter-logging
4943351 [Patrick Wendell] Added a test and feedback on mateis review
a637a18 [Patrick Wendell] Review feedback and adding rewind() when reading byte 
buffers.
b76b95f [Patrick Wendell] Review feedback
4e1514e [Patrick Wendell] Don't memory map for small files
d238b88 [Patrick Wendell] Some logging and clean-up
(cherry picked from commit 6b3c6e5dd8e74435f71ecdb224db532550ef407b)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f24159c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f24159c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f24159c

Branch: refs/heads/branch-1.0
Commit: 2f24159c134bf47dbc6b9f00010fbae1231925ad
Parents: 99285d0
Author: Patrick Wendell <[email protected]>
Authored: Sun Apr 27 17:40:56 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Sun Apr 27 17:41:08 2014 -0700

----------------------------------------------------------------------
 .../spark/storage/BlockFetcherIterator.scala    | 22 ++++----
 .../org/apache/spark/storage/BlockManager.scala |  3 +-
 .../org/apache/spark/storage/DiskStore.scala    | 16 +++++-
 .../scala/org/apache/spark/util/Utils.scala     |  3 +-
 .../spark/storage/BlockManagerSuite.scala       | 58 ++++++++++++++++++--
 docs/configuration.md                           |  9 +++
 6 files changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2f24159c/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index ace9cd5..a02dd94 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -148,6 +148,12 @@ object BlockFetcherIterator {
     }
 
     protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
+      // Make remote requests at most maxBytesInFlight / 5 in length; the 
reason to keep them
+      // smaller than maxBytesInFlight is to allow multiple, parallel fetches 
from up to 5
+      // nodes, rather than blocking on reading output from one node.
+      val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
+      logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: 
" + targetRequestSize)
+
       // Split local and remote blocks. Remote blocks are further split into 
FetchRequests of size
       // at most maxBytesInFlight in order to limit the amount of data in 
flight.
       val remoteRequests = new ArrayBuffer[FetchRequest]
@@ -159,11 +165,6 @@ object BlockFetcherIterator {
           _numBlocksToFetch += localBlocksToFetch.size
         } else {
           numRemote += blockInfos.size
-          // Make our requests at least maxBytesInFlight / 5 in length; the 
reason to keep them
-          // smaller than maxBytesInFlight is to allow multiple, parallel 
fetches from up to 5
-          // nodes, rather than blocking on reading output from one node.
-          val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
-          logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + 
minRequestSize)
           val iterator = blockInfos.iterator
           var curRequestSize = 0L
           var curBlocks = new ArrayBuffer[(BlockId, Long)]
@@ -178,11 +179,12 @@ object BlockFetcherIterator {
             } else if (size < 0) {
               throw new BlockException(blockId, "Negative block size " + size)
             }
-            if (curRequestSize >= minRequestSize) {
+            if (curRequestSize >= targetRequestSize) {
               // Add this FetchRequest
               remoteRequests += new FetchRequest(address, curBlocks)
               curRequestSize = 0
               curBlocks = new ArrayBuffer[(BlockId, Long)]
+              logDebug(s"Creating fetch request of $curRequestSize at 
$address")
             }
           }
           // Add in the final request
@@ -191,7 +193,7 @@ object BlockFetcherIterator {
           }
         }
       }
-      logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of 
" +
+      logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
         totalBlocks + " blocks")
       remoteRequests
     }
@@ -226,8 +228,8 @@ object BlockFetcherIterator {
         sendRequest(fetchRequests.dequeue())
       }
 
-      val numGets = remoteRequests.size - fetchRequests.size
-      logInfo("Started " + numGets + " remote gets in " + 
Utils.getUsedTimeMs(startTime))
+      val numFetches = remoteRequests.size - fetchRequests.size
+      logInfo("Started " + numFetches + " remote fetches in" + 
Utils.getUsedTimeMs(startTime))
 
       // Get Local Blocks
       startTime = System.currentTimeMillis
@@ -327,7 +329,7 @@ object BlockFetcherIterator {
       }
 
       copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))
-      logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
+      logInfo("Started " + fetchRequestsSync.size + " remote fetches in " +
         Utils.getUsedTimeMs(startTime))
 
       // Get Local Blocks

http://git-wip-us.apache.org/repos/asf/spark/blob/2f24159c/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 02ba5ec..6d7d4f9 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -46,11 +46,12 @@ private[spark] class BlockManager(
     val master: BlockManagerMaster,
     val defaultSerializer: Serializer,
     maxMemory: Long,
-    val conf: SparkConf,
+    val _conf: SparkConf,
     securityManager: SecurityManager,
     mapOutputTracker: MapOutputTracker)
   extends Logging {
 
+  def conf = _conf
   val shuffleBlockManager = new ShuffleBlockManager(this)
   val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
     conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")))

http://git-wip-us.apache.org/repos/asf/spark/blob/2f24159c/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 36ee4bc..0ab9fad 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -33,6 +33,8 @@ import org.apache.spark.util.Utils
 private class DiskStore(blockManager: BlockManager, diskManager: 
DiskBlockManager)
   extends BlockStore(blockManager) with Logging {
 
+  val minMemoryMapBytes = 
blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
+
   override def getSize(blockId: BlockId): Long = {
     diskManager.getBlockLocation(blockId).length
   }
@@ -94,12 +96,20 @@ private class DiskStore(blockManager: BlockManager, 
diskManager: DiskBlockManage
   override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
     val segment = diskManager.getBlockLocation(blockId)
     val channel = new RandomAccessFile(segment.file, "r").getChannel()
-    val buffer = try {
-      channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
+
+    try {
+      // For small files, directly read rather than memory map
+      if (segment.length < minMemoryMapBytes) {
+        val buf = ByteBuffer.allocate(segment.length.toInt)
+        channel.read(buf, segment.offset)
+        buf.flip()
+        Some(buf)
+      } else {
+        Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))
+      }
     } finally {
       channel.close()
     }
-    Some(buffer)
   }
 
   override def getValues(blockId: BlockId): Option[Iterator[Any]] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2f24159c/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5a55e7d..b678604 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -553,8 +553,7 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Return the string to tell how long has passed in seconds. The passing 
parameter should be in
-   * millisecond.
+   * Return the string to tell how long has passed in milliseconds.
    */
   def getUsedTimeMs(startTimeMs: Long): String = {
     " " + (System.currentTimeMillis - startTimeMs) + " ms"

http://git-wip-us.apache.org/repos/asf/spark/blob/2f24159c/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 907428d..00deecc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.storage
 
-import java.nio.ByteBuffer
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.Arrays
 
 import akka.actor._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
 import org.scalatest.matchers.ShouldMatchers._
@@ -785,6 +788,53 @@ class BlockManagerSuite extends FunSuite with 
BeforeAndAfter with PrivateMethodT
     }
   }
 
+  test("reads of memory-mapped and non memory-mapped files are equivalent") {
+    val confKey = "spark.storage.memoryMapThreshold"
+
+    // Create a non-trivial (not all zeros) byte array
+    var counter = 0.toByte
+    def incr = {counter = (counter + 1).toByte; counter;}
+    val bytes = Array.fill[Byte](1000)(incr)
+    val byteBuffer = ByteBuffer.wrap(bytes)
+
+    val blockId = BlockId("rdd_1_2")
+
+    // This sequence of mocks makes these tests fairly brittle. It would
+    // be nice to refactor classes involved in disk storage in a way that
+    // allows for easier testing.
+    val blockManager = mock(classOf[BlockManager])
+    val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
+    when(shuffleBlockManager.conf).thenReturn(conf)
+    val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
+      System.getProperty("java.io.tmpdir"))
+
+    when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
+    val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
+    diskStoreMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
+    val mapped = diskStoreMapped.getBytes(blockId).get
+
+    when(blockManager.conf).thenReturn(conf.clone.set(confKey, (1000 * 
1000).toString))
+    val diskStoreNotMapped = new DiskStore(blockManager, diskBlockManager)
+    diskStoreNotMapped.putBytes(blockId, byteBuffer, StorageLevel.DISK_ONLY)
+    val notMapped = diskStoreNotMapped.getBytes(blockId).get
+
+    // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
+    assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+      "Expected HeapByteBuffer for un-mapped read")
+    assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer 
for mapped read")
+
+    def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
+      val array = new Array[Byte](in.remaining())
+      in.get(array)
+      array
+    }
+
+    val mappedAsArray = arrayFromByteBuffer(mapped)
+    val notMappedAsArray = arrayFromByteBuffer(notMapped)
+    assert(Arrays.equals(mappedAsArray, bytes))
+    assert(Arrays.equals(notMappedAsArray, bytes))
+  }
+  
   test("updated block statuses") {
     store = new BlockManager("<driver>", actorSystem, master, serializer, 
1200, conf,
       securityMgr, mapOutputTracker)

http://git-wip-us.apache.org/repos/asf/spark/blob/2f24159c/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 8d34426..b078c7c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -132,6 +132,15 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td>spark.storage.memoryMapThreshold</td>
+  <td>8192</td>
+  <td>
+    Size of a block, in bytes, above which Spark memory maps when reading a 
block from disk.
+    This prevents Spark from memory mapping very small blocks. In general, 
memory
+    mapping has high overhead for blocks close to or below the page size of 
the operating system.
+  </td>
+</tr>
+<tr>
   <td>spark.tachyonStore.baseDir</td>
   <td>System.getProperty("java.io.tmpdir")</td>
   <td>

Reply via email to