This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0ac516b [SPARK-25035][CORE] Avoiding memory mapping at disk-stored blocks replication 0ac516b is described below commit 0ac516bebd4cf647fe073af45298186103134f33 Author: “attilapiros” <piros.attila.zs...@gmail.com> AuthorDate: Mon Feb 25 11:42:55 2019 -0800 [SPARK-25035][CORE] Avoiding memory mapping at disk-stored blocks replication Before this PR the method `BlockManager#putBlockDataAsStream()` (which is used during block replication where the block data is received as a stream) was reading the whole block content into the memory even at DISK_ONLY storage level. With this change the received block data (which was temporary stored in a file) is just simply moved into the right location backing the target block. This way a possible OOM error is avoided. In this implementation to save code duplications the method `doPutBytes` is refactored into a template method called `BlockStoreUpdater` which has a separate implementation to handle byte buffer based and temporary file based block store updates. With existing unit tests of `DistributedSuite` (the ones dealing with replications): - caching on disk, replicated (encryption = off) (with replication as stream) - caching on disk, replicated (encryption = on) (with replication as stream) - caching in memory, serialized, replicated (encryption = on) (with replication as stream) - caching in memory, serialized, replicated (encryption = off) (with replication as stream) - etc. And with new unit tests testing `putBlockDataAsStream` method directly: - test putBlockDataAsStream with caching (encryption = off) - test putBlockDataAsStream with caching (encryption = on) - test putBlockDataAsStream with caching on disk (encryption = off) - test putBlockDataAsStream with caching on disk (encryption = on) Closes #23688 from attilapiros/SPARK-25035. Authored-by: “attilapiros” <piros.attila.zs...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/storage/BlockManager.scala | 315 +++++++++++++-------- .../scala/org/apache/spark/storage/DiskStore.scala | 30 +- .../apache/spark/storage/BlockManagerSuite.scala | 60 +++- 3 files changed, 263 insertions(+), 142 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 79e9ee7..09928e4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -33,6 +33,7 @@ import scala.util.Random import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} +import org.apache.commons.io.IOUtils import org.apache.spark._ import org.apache.spark.executor.DataReadMethod @@ -222,6 +223,187 @@ private[spark] class BlockManager( private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) /** + * Abstraction for storing blocks from bytes, whether they start in memory or on disk. + * + * @param blockSize the decrypted size of the block + */ + private abstract class BlockStoreUpdater[T]( + blockSize: Long, + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[T], + tellMaster: Boolean, + keepReadLock: Boolean) { + + /** + * Reads the block content into the memory. If the update of the block store is based on a + * temporary file this could lead to loading the whole file into a ChunkedByteBuffer. + */ + protected def readToByteBuffer(): ChunkedByteBuffer + + protected def blockData(): BlockData + + protected def saveToDiskStore(): Unit + + private def saveDeserializedValuesToMemoryStore(inputStream: InputStream): Boolean = { + try { + val values = serializerManager.dataDeserializeStream(blockId, inputStream)(classTag) + memoryStore.putIteratorAsValues(blockId, values, classTag) match { + case Right(_) => true + case Left(iter) => + // If putting deserialized values in memory failed, we will put the bytes directly + // to disk, so we don't need this iterator and can close it to free resources + // earlier. + iter.close() + false + } + } finally { + IOUtils.closeQuietly(inputStream) + } + } + + private def saveSerializedValuesToMemoryStore(bytes: ChunkedByteBuffer): Boolean = { + val memoryMode = level.memoryMode + memoryStore.putBytes(blockId, blockSize, memoryMode, () => { + if (memoryMode == MemoryMode.OFF_HEAP && bytes.chunks.exists(!_.isDirect)) { + bytes.copy(Platform.allocateDirectBuffer) + } else { + bytes + } + }) + } + + /** + * Put the given data according to the given level in one of the block stores, replicating + * the values if necessary. + * + * If the block already exists, this method will not overwrite it. + * + * If keepReadLock is true, this method will hold the read lock when it returns (even if the + * block already exists). If false, this method will hold no locks when it returns. + * + * @return true if the block was already present or if the put succeeded, false otherwise. + */ + def save(): Boolean = { + doPut(blockId, level, classTag, tellMaster, keepReadLock) { info => + val startTimeNs = System.nanoTime() + + // Since we're storing bytes, initiate the replication before storing them locally. + // This is faster as data is already serialized and ready to send. + val replicationFuture = if (level.replication > 1) { + Future { + // This is a blocking action and should run in futureExecutionContext which is a cached + // thread pool. + replicate(blockId, blockData(), level, classTag) + }(futureExecutionContext) + } else { + null + } + if (level.useMemory) { + // Put it in memory first, even if it also has useDisk set to true; + // We will drop it to disk later if the memory store can't hold it. + val putSucceeded = if (level.deserialized) { + saveDeserializedValuesToMemoryStore(blockData().toInputStream()) + } else { + saveSerializedValuesToMemoryStore(readToByteBuffer()) + } + if (!putSucceeded && level.useDisk) { + logWarning(s"Persisting block $blockId to disk instead.") + saveToDiskStore() + } + } else if (level.useDisk) { + saveToDiskStore() + } + val putBlockStatus = getCurrentBlockStatus(blockId, info) + val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid + if (blockWasSuccessfullyStored) { + // Now that the block is in either the memory or disk store, + // tell the master about it. + info.size = blockSize + if (tellMaster && info.tellMaster) { + reportBlockStatus(blockId, putBlockStatus) + } + addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) + } + logDebug(s"Put block ${blockId} locally took ${Utils.getUsedTimeNs(startTimeNs)}") + if (level.replication > 1) { + // Wait for asynchronous replication to finish + try { + ThreadUtils.awaitReady(replicationFuture, Duration.Inf) + } catch { + case NonFatal(t) => + throw new Exception("Error occurred while waiting for replication to finish", t) + } + } + if (blockWasSuccessfullyStored) { + None + } else { + Some(blockSize) + } + }.isEmpty + } + } + + /** + * Helper for storing a block from bytes already in memory. + * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing + * so may corrupt or change the data stored by the `BlockManager`. + */ + private case class ByteBufferBlockStoreUpdater[T]( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[T], + bytes: ChunkedByteBuffer, + tellMaster: Boolean = true, + keepReadLock: Boolean = false) + extends BlockStoreUpdater[T](bytes.size, blockId, level, classTag, tellMaster, keepReadLock) { + + override def readToByteBuffer(): ChunkedByteBuffer = bytes + + /** + * The ByteBufferBlockData wrapper is not disposed of to avoid releasing buffers that are + * owned by the caller. + */ + override def blockData(): BlockData = new ByteBufferBlockData(bytes, false) + + override def saveToDiskStore(): Unit = diskStore.putBytes(blockId, bytes) + + } + + /** + * Helper for storing a block based from bytes already in a local temp file. + */ + private case class TempFileBasedBlockStoreUpdater[T]( + blockId: BlockId, + level: StorageLevel, + classTag: ClassTag[T], + tmpFile: File, + blockSize: Long, + tellMaster: Boolean = true, + keepReadLock: Boolean = false) + extends BlockStoreUpdater[T](blockSize, blockId, level, classTag, tellMaster, keepReadLock) { + + override def readToByteBuffer(): ChunkedByteBuffer = { + val allocator = level.memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + blockData().toChunkedByteBuffer(allocator) + } + + override def blockData(): BlockData = diskStore.getBytes(tmpFile, blockSize) + + override def saveToDiskStore(): Unit = diskStore.moveFileToBlock(tmpFile, blockSize, blockId) + + override def save(): Boolean = { + val res = super.save() + tmpFile.delete() + res + } + + } + + /** * Initializes the BlockManager with the given appId. This is not performed in the constructor as * the appId may not be known at BlockManager instantiation time (in particular for the driver, * where it is only learned after registration with the TaskScheduler). @@ -412,10 +594,7 @@ private[spark] class BlockManager( blockId: BlockId, level: StorageLevel, classTag: ClassTag[_]): StreamCallbackWithID = { - // TODO if we're going to only put the data in the disk store, we should just write it directly - // to the final location, but that would require a deeper refactor of this code. So instead - // we just write to a temp file, and call putBytes on the data in that file. - val tmpFile = diskBlockManager.createTempLocalBlock()._2 + val (_, tmpFile) = diskBlockManager.createTempLocalBlock() val channel = new CountingWritableChannel( Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile)))) logTrace(s"Streaming block $blockId to tmp file $tmpFile") @@ -431,28 +610,11 @@ private[spark] class BlockManager( override def onComplete(streamId: String): Unit = { logTrace(s"Done receiving block $blockId, now putting into local blockManager") - // Read the contents of the downloaded file as a buffer to put into the blockManager. // Note this is all happening inside the netty thread as soon as it reads the end of the // stream. channel.close() - // TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up - // using a lot of memory here. We'll read the whole file into a regular - // byte buffer and OOM. We could at least read the tmp file as a stream. - val buffer = securityManager.getIOEncryptionKey() match { - case Some(key) => - // we need to pass in the size of the unencrypted block - val blockSize = channel.getCount - val allocator = level.memoryMode match { - case MemoryMode.ON_HEAP => ByteBuffer.allocate _ - case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ - } - new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator) - - case None => - ChunkedByteBuffer.fromFile(tmpFile) - } - putBytes(blockId, buffer, level)(classTag) - tmpFile.delete() + val blockSize = channel.getCount + TempFileBasedBlockStoreUpdater(blockId, level, classTag, tmpFile, blockSize).save() } override def onFailure(streamId: String, cause: Throwable): Unit = { @@ -953,111 +1115,14 @@ private[spark] class BlockManager( level: StorageLevel, tellMaster: Boolean = true): Boolean = { require(bytes != null, "Bytes is null") - doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster) - } - - /** - * Put the given bytes according to the given level in one of the block stores, replicating - * the values if necessary. - * - * If the block already exists, this method will not overwrite it. - * - * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing - * so may corrupt or change the data stored by the `BlockManager`. - * - * @param keepReadLock if true, this method will hold the read lock when it returns (even if the - * block already exists). If false, this method will hold no locks when it - * returns. - * @return true if the block was already present or if the put succeeded, false otherwise. - */ - private def doPutBytes[T]( - blockId: BlockId, - bytes: ChunkedByteBuffer, - level: StorageLevel, - classTag: ClassTag[T], - tellMaster: Boolean = true, - keepReadLock: Boolean = false): Boolean = { - doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info => - val startTimeNs = System.nanoTime() - // Since we're storing bytes, initiate the replication before storing them locally. - // This is faster as data is already serialized and ready to send. - val replicationFuture = if (level.replication > 1) { - Future { - // This is a blocking action and should run in futureExecutionContext which is a cached - // thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing - // buffers that are owned by the caller. - replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag) - }(futureExecutionContext) - } else { - null - } - - val size = bytes.size - - if (level.useMemory) { - // Put it in memory first, even if it also has useDisk set to true; - // We will drop it to disk later if the memory store can't hold it. - val putSucceeded = if (level.deserialized) { - val values = - serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag) - memoryStore.putIteratorAsValues(blockId, values, classTag) match { - case Right(_) => true - case Left(iter) => - // If putting deserialized values in memory failed, we will put the bytes directly to - // disk, so we don't need this iterator and can close it to free resources earlier. - iter.close() - false - } - } else { - val memoryMode = level.memoryMode - memoryStore.putBytes(blockId, size, memoryMode, () => { - if (memoryMode == MemoryMode.OFF_HEAP && - bytes.chunks.exists(buffer => !buffer.isDirect)) { - bytes.copy(Platform.allocateDirectBuffer) - } else { - bytes - } - }) - } - if (!putSucceeded && level.useDisk) { - logWarning(s"Persisting block $blockId to disk instead.") - diskStore.putBytes(blockId, bytes) - } - } else if (level.useDisk) { - diskStore.putBytes(blockId, bytes) - } - - val putBlockStatus = getCurrentBlockStatus(blockId, info) - val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid - if (blockWasSuccessfullyStored) { - // Now that the block is in either the memory or disk store, - // tell the master about it. - info.size = size - if (tellMaster && info.tellMaster) { - reportBlockStatus(blockId, putBlockStatus) - } - addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus) - } - logDebug(s"Put block ${blockId} locally took ${Utils.getUsedTimeNs(startTimeNs)}") - if (level.replication > 1) { - // Wait for asynchronous replication to finish - try { - ThreadUtils.awaitReady(replicationFuture, Duration.Inf) - } catch { - case NonFatal(t) => - throw new Exception("Error occurred while waiting for replication to finish", t) - } - } - if (blockWasSuccessfullyStored) { - None - } else { - Some(bytes) - } - }.isEmpty + val blockStoreUpdater = + ByteBufferBlockStoreUpdater(blockId, level, implicitly[ClassTag[T]], bytes, tellMaster) + blockStoreUpdater.save() } /** - * Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]]. + * Helper method used to abstract common code from [[BlockStoreUpdater.save()]] + * and [[doPutIterator()]]. * * @param putBody a function which attempts the actual put() and returns None on success * or Some on failure. diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index aefa2ae..fbda491 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer import com.google.common.io.Closeables import io.netty.channel.DefaultFileRegion +import org.apache.commons.io.FileUtils import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.{config, Logging} @@ -95,18 +96,17 @@ private[spark] class DiskStore( } def getBytes(blockId: BlockId): BlockData = { - val file = diskManager.getFile(blockId.name) - val blockSize = getSize(blockId) + getBytes(diskManager.getFile(blockId.name), getSize(blockId)) + } - securityManager.getIOEncryptionKey() match { - case Some(key) => - // Encrypted blocks cannot be memory mapped; return a special object that does decryption - // and provides InputStream / FileRegion implementations for reading the data. - new EncryptedBlockData(file, blockSize, conf, key) + def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match { + case Some(key) => + // Encrypted blocks cannot be memory mapped; return a special object that does decryption + // and provides InputStream / FileRegion implementations for reading the data. + new EncryptedBlockData(f, blockSize, conf, key) - case _ => - new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize) - } + case _ => + new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, f, blockSize) } def remove(blockId: BlockId): Boolean = { @@ -123,6 +123,16 @@ private[spark] class DiskStore( } } + /** + * @param blockSize if encryption is configured, the file is assumed to already be encrypted and + * blockSize should be the decrypted size + */ + def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId: BlockId): Unit = { + blockSizes.put(targetBlockId, blockSize) + val targetFile = diskManager.getFile(targetBlockId.name) + FileUtils.moveFile(sourceFile, targetFile) + } + def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 40c6424..ac35ac3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -80,6 +80,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId) + private def init(sparkConf: SparkConf): Unit = { + sparkConf + .set("spark.app.id", "test") + .set(IS_TESTING, true) + .set(MEMORY_FRACTION, 1.0) + .set(MEMORY_STORAGE_FRACTION, 0.999) + .set("spark.kryoserializer.buffer", "1m") + .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) + } + private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER, @@ -113,12 +123,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case System.setProperty("os.arch", "amd64") conf = new SparkConf(false) - .set("spark.app.id", "test") - .set(IS_TESTING, true) - .set(MEMORY_FRACTION, 1.0) - .set(MEMORY_STORAGE_FRACTION, 0.999) - .set("spark.kryoserializer.buffer", "1m") - .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) + init(conf) rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set(DRIVER_PORT, rpcEnv.address.port) @@ -890,7 +895,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) - memoryManager.setMemoryStore(store.memoryStore) store.initialize("app-id") // The put should fail since a1 is not serializable. @@ -906,6 +910,48 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE } } + def testPutBlockDataAsStream(blockManager: BlockManager, storageLevel: StorageLevel): Unit = { + val message = "message" + val ser = serializer.newInstance().serialize(message).array() + val blockId = new RDDBlockId(0, 0) + val streamCallbackWithId = + blockManager.putBlockDataAsStream(blockId, storageLevel, ClassTag(message.getClass)) + streamCallbackWithId.onData("0", ByteBuffer.wrap(ser)) + streamCallbackWithId.onComplete("0") + val blockStatusOption = blockManager.getStatus(blockId) + assert(!blockStatusOption.isEmpty) + val blockStatus = blockStatusOption.get + assert((blockStatus.diskSize > 0) === !storageLevel.useMemory) + assert((blockStatus.memSize > 0) === storageLevel.useMemory) + assert(blockManager.getBlockData(blockId).nioByteBuffer().array() === ser) + } + + Seq( + "caching" -> StorageLevel.MEMORY_ONLY, + "caching, serialized" -> StorageLevel.MEMORY_ONLY_SER, + "caching on disk" -> StorageLevel.DISK_ONLY + ).foreach { case (name, storageLevel) => + encryptionTest(s"test putBlockDataAsStream with $name") { conf => + init(conf) + val ioEncryptionKey = + if (conf.get(IO_ENCRYPTION_ENABLED)) Some(CryptoStreamUtils.createKey(conf)) else None + val securityMgr = new SecurityManager(conf, ioEncryptionKey) + val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) + val transfer = + new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) + val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, + serializerManager, conf, memoryManager, mapOutputTracker, + shuffleManager, transfer, securityMgr, 0) + try { + blockManager.initialize("app-id") + testPutBlockDataAsStream(blockManager, storageLevel) + } finally { + blockManager.stop() + } + } + } + test("turn off updated block statuses") { val conf = new SparkConf() conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org