This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ac516b  [SPARK-25035][CORE] Avoiding memory mapping at disk-stored 
blocks replication
0ac516b is described below

commit 0ac516bebd4cf647fe073af45298186103134f33
Author: “attilapiros” <piros.attila.zs...@gmail.com>
AuthorDate: Mon Feb 25 11:42:55 2019 -0800

    [SPARK-25035][CORE] Avoiding memory mapping at disk-stored blocks 
replication
    
    Before this PR the method `BlockManager#putBlockDataAsStream()` (which is 
used during block replication where the block data is received as a stream) was 
reading the whole block content into the memory even at DISK_ONLY storage level.
    
    With this change the received block data (which was temporary stored in a 
file) is just simply moved into the right location backing the target block. 
This way a possible OOM error is avoided.
    
    In this implementation to save code duplications the method `doPutBytes` is 
refactored into a template method called `BlockStoreUpdater` which has a 
separate implementation to handle byte buffer based and temporary file based 
block store updates.
    
    With existing unit tests of `DistributedSuite` (the ones dealing with 
replications):
    - caching on disk, replicated (encryption = off) (with replication as 
stream)
    - caching on disk, replicated (encryption = on) (with replication as stream)
    - caching in memory, serialized, replicated (encryption = on) (with 
replication as stream)
    - caching in memory, serialized, replicated (encryption = off) (with 
replication as stream)
    - etc.
    
    And with new unit tests testing `putBlockDataAsStream` method directly:
    - test putBlockDataAsStream with caching (encryption = off)
    - test putBlockDataAsStream with caching (encryption = on)
    - test putBlockDataAsStream with caching on disk (encryption = off)
    - test putBlockDataAsStream with caching on disk (encryption = on)
    
    Closes #23688 from attilapiros/SPARK-25035.
    
    Authored-by: “attilapiros” <piros.attila.zs...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/storage/BlockManager.scala    | 315 +++++++++++++--------
 .../scala/org/apache/spark/storage/DiskStore.scala |  30 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |  60 +++-
 3 files changed, 263 insertions(+), 142 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 79e9ee7..09928e4 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -33,6 +33,7 @@ import scala.util.Random
 import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
+import org.apache.commons.io.IOUtils
 
 import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
@@ -222,6 +223,187 @@ private[spark] class BlockManager(
   private val maxRemoteBlockToMem = 
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
 
   /**
+   * Abstraction for storing blocks from bytes, whether they start in memory 
or on disk.
+   *
+   * @param blockSize the decrypted size of the block
+   */
+  private abstract class BlockStoreUpdater[T](
+      blockSize: Long,
+      blockId: BlockId,
+      level: StorageLevel,
+      classTag: ClassTag[T],
+      tellMaster: Boolean,
+      keepReadLock: Boolean) {
+
+    /**
+     *  Reads the block content into the memory. If the update of the block 
store is based on a
+     *  temporary file this could lead to loading the whole file into a 
ChunkedByteBuffer.
+     */
+    protected def readToByteBuffer(): ChunkedByteBuffer
+
+    protected def blockData(): BlockData
+
+    protected def saveToDiskStore(): Unit
+
+    private def saveDeserializedValuesToMemoryStore(inputStream: InputStream): 
Boolean = {
+      try {
+        val values = serializerManager.dataDeserializeStream(blockId, 
inputStream)(classTag)
+        memoryStore.putIteratorAsValues(blockId, values, classTag) match {
+          case Right(_) => true
+          case Left(iter) =>
+            // If putting deserialized values in memory failed, we will put 
the bytes directly
+            // to disk, so we don't need this iterator and can close it to 
free resources
+            // earlier.
+            iter.close()
+            false
+        }
+      } finally {
+        IOUtils.closeQuietly(inputStream)
+      }
+    }
+
+    private def saveSerializedValuesToMemoryStore(bytes: ChunkedByteBuffer): 
Boolean = {
+      val memoryMode = level.memoryMode
+      memoryStore.putBytes(blockId, blockSize, memoryMode, () => {
+        if (memoryMode == MemoryMode.OFF_HEAP && 
bytes.chunks.exists(!_.isDirect)) {
+          bytes.copy(Platform.allocateDirectBuffer)
+        } else {
+          bytes
+        }
+      })
+    }
+
+    /**
+     * Put the given data according to the given level in one of the block 
stores, replicating
+     * the values if necessary.
+     *
+     * If the block already exists, this method will not overwrite it.
+     *
+     * If keepReadLock is true, this method will hold the read lock when it 
returns (even if the
+     * block already exists). If false, this method will hold no locks when it 
returns.
+     *
+     * @return true if the block was already present or if the put succeeded, 
false otherwise.
+     */
+     def save(): Boolean = {
+      doPut(blockId, level, classTag, tellMaster, keepReadLock) { info =>
+        val startTimeNs = System.nanoTime()
+
+        // Since we're storing bytes, initiate the replication before storing 
them locally.
+        // This is faster as data is already serialized and ready to send.
+        val replicationFuture = if (level.replication > 1) {
+          Future {
+            // This is a blocking action and should run in 
futureExecutionContext which is a cached
+            // thread pool.
+            replicate(blockId, blockData(), level, classTag)
+          }(futureExecutionContext)
+        } else {
+          null
+        }
+        if (level.useMemory) {
+          // Put it in memory first, even if it also has useDisk set to true;
+          // We will drop it to disk later if the memory store can't hold it.
+          val putSucceeded = if (level.deserialized) {
+            saveDeserializedValuesToMemoryStore(blockData().toInputStream())
+          } else {
+            saveSerializedValuesToMemoryStore(readToByteBuffer())
+          }
+          if (!putSucceeded && level.useDisk) {
+            logWarning(s"Persisting block $blockId to disk instead.")
+            saveToDiskStore()
+          }
+        } else if (level.useDisk) {
+          saveToDiskStore()
+        }
+        val putBlockStatus = getCurrentBlockStatus(blockId, info)
+        val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+        if (blockWasSuccessfullyStored) {
+          // Now that the block is in either the memory or disk store,
+          // tell the master about it.
+          info.size = blockSize
+          if (tellMaster && info.tellMaster) {
+            reportBlockStatus(blockId, putBlockStatus)
+          }
+          addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
+        }
+        logDebug(s"Put block ${blockId} locally took 
${Utils.getUsedTimeNs(startTimeNs)}")
+        if (level.replication > 1) {
+          // Wait for asynchronous replication to finish
+          try {
+            ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
+          } catch {
+            case NonFatal(t) =>
+              throw new Exception("Error occurred while waiting for 
replication to finish", t)
+          }
+        }
+        if (blockWasSuccessfullyStored) {
+          None
+        } else {
+          Some(blockSize)
+        }
+      }.isEmpty
+    }
+  }
+
+  /**
+   * Helper for storing a block from bytes already in memory.
+   * '''Important!''' Callers must not mutate or release the data buffer 
underlying `bytes`. Doing
+   * so may corrupt or change the data stored by the `BlockManager`.
+   */
+  private case class ByteBufferBlockStoreUpdater[T](
+      blockId: BlockId,
+      level: StorageLevel,
+      classTag: ClassTag[T],
+      bytes: ChunkedByteBuffer,
+      tellMaster: Boolean = true,
+      keepReadLock: Boolean = false)
+    extends BlockStoreUpdater[T](bytes.size, blockId, level, classTag, 
tellMaster, keepReadLock) {
+
+    override def readToByteBuffer(): ChunkedByteBuffer = bytes
+
+    /**
+     * The ByteBufferBlockData wrapper is not disposed of to avoid releasing 
buffers that are
+     * owned by the caller.
+     */
+    override def blockData(): BlockData = new ByteBufferBlockData(bytes, false)
+
+    override def saveToDiskStore(): Unit = diskStore.putBytes(blockId, bytes)
+
+  }
+
+  /**
+   * Helper for storing a block based from bytes already in a local temp file.
+   */
+  private case class TempFileBasedBlockStoreUpdater[T](
+      blockId: BlockId,
+      level: StorageLevel,
+      classTag: ClassTag[T],
+      tmpFile: File,
+      blockSize: Long,
+      tellMaster: Boolean = true,
+      keepReadLock: Boolean = false)
+    extends BlockStoreUpdater[T](blockSize, blockId, level, classTag, 
tellMaster, keepReadLock) {
+
+    override def readToByteBuffer(): ChunkedByteBuffer = {
+      val allocator = level.memoryMode match {
+        case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+        case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+      }
+      blockData().toChunkedByteBuffer(allocator)
+    }
+
+    override def blockData(): BlockData = diskStore.getBytes(tmpFile, 
blockSize)
+
+    override def saveToDiskStore(): Unit = diskStore.moveFileToBlock(tmpFile, 
blockSize, blockId)
+
+    override def save(): Boolean = {
+      val res = super.save()
+      tmpFile.delete()
+      res
+    }
+
+  }
+
+  /**
    * Initializes the BlockManager with the given appId. This is not performed 
in the constructor as
    * the appId may not be known at BlockManager instantiation time (in 
particular for the driver,
    * where it is only learned after registration with the TaskScheduler).
@@ -412,10 +594,7 @@ private[spark] class BlockManager(
       blockId: BlockId,
       level: StorageLevel,
       classTag: ClassTag[_]): StreamCallbackWithID = {
-    // TODO if we're going to only put the data in the disk store, we should 
just write it directly
-    // to the final location, but that would require a deeper refactor of this 
code.  So instead
-    // we just write to a temp file, and call putBytes on the data in that 
file.
-    val tmpFile = diskBlockManager.createTempLocalBlock()._2
+    val (_, tmpFile) = diskBlockManager.createTempLocalBlock()
     val channel = new CountingWritableChannel(
       Channels.newChannel(serializerManager.wrapForEncryption(new 
FileOutputStream(tmpFile))))
     logTrace(s"Streaming block $blockId to tmp file $tmpFile")
@@ -431,28 +610,11 @@ private[spark] class BlockManager(
 
       override def onComplete(streamId: String): Unit = {
         logTrace(s"Done receiving block $blockId, now putting into local 
blockManager")
-        // Read the contents of the downloaded file as a buffer to put into 
the blockManager.
         // Note this is all happening inside the netty thread as soon as it 
reads the end of the
         // stream.
         channel.close()
-        // TODO SPARK-25035 Even if we're only going to write the data to disk 
after this, we end up
-        // using a lot of memory here. We'll read the whole file into a regular
-        // byte buffer and OOM.  We could at least read the tmp file as a 
stream.
-        val buffer = securityManager.getIOEncryptionKey() match {
-          case Some(key) =>
-            // we need to pass in the size of the unencrypted block
-            val blockSize = channel.getCount
-            val allocator = level.memoryMode match {
-              case MemoryMode.ON_HEAP => ByteBuffer.allocate _
-              case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
-            }
-            new EncryptedBlockData(tmpFile, blockSize, conf, 
key).toChunkedByteBuffer(allocator)
-
-          case None =>
-            ChunkedByteBuffer.fromFile(tmpFile)
-        }
-        putBytes(blockId, buffer, level)(classTag)
-        tmpFile.delete()
+        val blockSize = channel.getCount
+        TempFileBasedBlockStoreUpdater(blockId, level, classTag, tmpFile, 
blockSize).save()
       }
 
       override def onFailure(streamId: String, cause: Throwable): Unit = {
@@ -953,111 +1115,14 @@ private[spark] class BlockManager(
       level: StorageLevel,
       tellMaster: Boolean = true): Boolean = {
     require(bytes != null, "Bytes is null")
-    doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
-  }
-
-  /**
-   * Put the given bytes according to the given level in one of the block 
stores, replicating
-   * the values if necessary.
-   *
-   * If the block already exists, this method will not overwrite it.
-   *
-   * '''Important!''' Callers must not mutate or release the data buffer 
underlying `bytes`. Doing
-   * so may corrupt or change the data stored by the `BlockManager`.
-   *
-   * @param keepReadLock if true, this method will hold the read lock when it 
returns (even if the
-   *                     block already exists). If false, this method will 
hold no locks when it
-   *                     returns.
-   * @return true if the block was already present or if the put succeeded, 
false otherwise.
-   */
-  private def doPutBytes[T](
-      blockId: BlockId,
-      bytes: ChunkedByteBuffer,
-      level: StorageLevel,
-      classTag: ClassTag[T],
-      tellMaster: Boolean = true,
-      keepReadLock: Boolean = false): Boolean = {
-    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { info =>
-      val startTimeNs = System.nanoTime()
-      // Since we're storing bytes, initiate the replication before storing 
them locally.
-      // This is faster as data is already serialized and ready to send.
-      val replicationFuture = if (level.replication > 1) {
-        Future {
-          // This is a blocking action and should run in 
futureExecutionContext which is a cached
-          // thread pool. The ByteBufferBlockData wrapper is not disposed of 
to avoid releasing
-          // buffers that are owned by the caller.
-          replicate(blockId, new ByteBufferBlockData(bytes, false), level, 
classTag)
-        }(futureExecutionContext)
-      } else {
-        null
-      }
-
-      val size = bytes.size
-
-      if (level.useMemory) {
-        // Put it in memory first, even if it also has useDisk set to true;
-        // We will drop it to disk later if the memory store can't hold it.
-        val putSucceeded = if (level.deserialized) {
-          val values =
-            serializerManager.dataDeserializeStream(blockId, 
bytes.toInputStream())(classTag)
-          memoryStore.putIteratorAsValues(blockId, values, classTag) match {
-            case Right(_) => true
-            case Left(iter) =>
-              // If putting deserialized values in memory failed, we will put 
the bytes directly to
-              // disk, so we don't need this iterator and can close it to free 
resources earlier.
-              iter.close()
-              false
-          }
-        } else {
-          val memoryMode = level.memoryMode
-          memoryStore.putBytes(blockId, size, memoryMode, () => {
-            if (memoryMode == MemoryMode.OFF_HEAP &&
-                bytes.chunks.exists(buffer => !buffer.isDirect)) {
-              bytes.copy(Platform.allocateDirectBuffer)
-            } else {
-              bytes
-            }
-          })
-        }
-        if (!putSucceeded && level.useDisk) {
-          logWarning(s"Persisting block $blockId to disk instead.")
-          diskStore.putBytes(blockId, bytes)
-        }
-      } else if (level.useDisk) {
-        diskStore.putBytes(blockId, bytes)
-      }
-
-      val putBlockStatus = getCurrentBlockStatus(blockId, info)
-      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
-      if (blockWasSuccessfullyStored) {
-        // Now that the block is in either the memory or disk store,
-        // tell the master about it.
-        info.size = size
-        if (tellMaster && info.tellMaster) {
-          reportBlockStatus(blockId, putBlockStatus)
-        }
-        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
-      }
-      logDebug(s"Put block ${blockId} locally took 
${Utils.getUsedTimeNs(startTimeNs)}")
-      if (level.replication > 1) {
-        // Wait for asynchronous replication to finish
-        try {
-          ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
-        } catch {
-          case NonFatal(t) =>
-            throw new Exception("Error occurred while waiting for replication 
to finish", t)
-        }
-      }
-      if (blockWasSuccessfullyStored) {
-        None
-      } else {
-        Some(bytes)
-      }
-    }.isEmpty
+    val blockStoreUpdater =
+      ByteBufferBlockStoreUpdater(blockId, level, implicitly[ClassTag[T]], 
bytes, tellMaster)
+    blockStoreUpdater.save()
   }
 
   /**
-   * Helper method used to abstract common code from [[doPutBytes()]] and 
[[doPutIterator()]].
+   * Helper method used to abstract common code from 
[[BlockStoreUpdater.save()]]
+   * and [[doPutIterator()]].
    *
    * @param putBody a function which attempts the actual put() and returns 
None on success
    *                or Some on failure.
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index aefa2ae..fbda491 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer
 
 import com.google.common.io.Closeables
 import io.netty.channel.DefaultFileRegion
+import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.{config, Logging}
@@ -95,18 +96,17 @@ private[spark] class DiskStore(
   }
 
   def getBytes(blockId: BlockId): BlockData = {
-    val file = diskManager.getFile(blockId.name)
-    val blockSize = getSize(blockId)
+    getBytes(diskManager.getFile(blockId.name), getSize(blockId))
+  }
 
-    securityManager.getIOEncryptionKey() match {
-      case Some(key) =>
-        // Encrypted blocks cannot be memory mapped; return a special object 
that does decryption
-        // and provides InputStream / FileRegion implementations for reading 
the data.
-        new EncryptedBlockData(file, blockSize, conf, key)
+  def getBytes(f: File, blockSize: Long): BlockData = 
securityManager.getIOEncryptionKey() match {
+    case Some(key) =>
+      // Encrypted blocks cannot be memory mapped; return a special object 
that does decryption
+      // and provides InputStream / FileRegion implementations for reading the 
data.
+      new EncryptedBlockData(f, blockSize, conf, key)
 
-      case _ =>
-        new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, 
blockSize)
-    }
+    case _ =>
+      new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, f, blockSize)
   }
 
   def remove(blockId: BlockId): Boolean = {
@@ -123,6 +123,16 @@ private[spark] class DiskStore(
     }
   }
 
+  /**
+   * @param blockSize if encryption is configured, the file is assumed to 
already be encrypted and
+   *                  blockSize should be the decrypted size
+   */
+  def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId: 
BlockId): Unit = {
+    blockSizes.put(targetBlockId, blockSize)
+    val targetFile = diskManager.getFile(targetBlockId.name)
+    FileUtils.moveFile(sourceFile, targetFile)
+  }
+
   def contains(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     file.exists()
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 40c6424..ac35ac3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -80,6 +80,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
   def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
 
+  private def init(sparkConf: SparkConf): Unit = {
+    sparkConf
+      .set("spark.app.id", "test")
+      .set(IS_TESTING, true)
+      .set(MEMORY_FRACTION, 1.0)
+      .set(MEMORY_STORAGE_FRACTION, 0.999)
+      .set("spark.kryoserializer.buffer", "1m")
+      .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
+  }
+
   private def makeBlockManager(
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER,
@@ -113,12 +123,7 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     // Set the arch to 64-bit and compressedOops to true to get a 
deterministic test-case
     System.setProperty("os.arch", "amd64")
     conf = new SparkConf(false)
-      .set("spark.app.id", "test")
-      .set(IS_TESTING, true)
-      .set(MEMORY_FRACTION, 1.0)
-      .set(MEMORY_STORAGE_FRACTION, 0.999)
-      .set("spark.kryoserializer.buffer", "1m")
-      .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
+    init(conf)
 
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set(DRIVER_PORT, rpcEnv.address.port)
@@ -890,7 +895,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, 
master,
       serializerManager, conf, memoryManager, mapOutputTracker,
       shuffleManager, transfer, securityMgr, 0)
-    memoryManager.setMemoryStore(store.memoryStore)
     store.initialize("app-id")
 
     // The put should fail since a1 is not serializable.
@@ -906,6 +910,48 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     }
   }
 
+  def testPutBlockDataAsStream(blockManager: BlockManager, storageLevel: 
StorageLevel): Unit = {
+    val message = "message"
+    val ser = serializer.newInstance().serialize(message).array()
+    val blockId = new RDDBlockId(0, 0)
+    val streamCallbackWithId =
+      blockManager.putBlockDataAsStream(blockId, storageLevel, 
ClassTag(message.getClass))
+    streamCallbackWithId.onData("0", ByteBuffer.wrap(ser))
+    streamCallbackWithId.onComplete("0")
+    val blockStatusOption = blockManager.getStatus(blockId)
+    assert(!blockStatusOption.isEmpty)
+    val blockStatus = blockStatusOption.get
+    assert((blockStatus.diskSize > 0) === !storageLevel.useMemory)
+    assert((blockStatus.memSize > 0) === storageLevel.useMemory)
+    assert(blockManager.getBlockData(blockId).nioByteBuffer().array() === ser)
+  }
+
+  Seq(
+    "caching" -> StorageLevel.MEMORY_ONLY,
+    "caching, serialized" -> StorageLevel.MEMORY_ONLY_SER,
+    "caching on disk" -> StorageLevel.DISK_ONLY
+  ).foreach { case (name, storageLevel) =>
+    encryptionTest(s"test putBlockDataAsStream with $name") { conf =>
+      init(conf)
+      val ioEncryptionKey =
+        if (conf.get(IO_ENCRYPTION_ENABLED)) 
Some(CryptoStreamUtils.createKey(conf)) else None
+      val securityMgr = new SecurityManager(conf, ioEncryptionKey)
+      val serializerManager = new SerializerManager(serializer, conf, 
ioEncryptionKey)
+      val transfer =
+        new NettyBlockTransferService(conf, securityMgr, "localhost", 
"localhost", 0, 1)
+      val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
+      val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, 
rpcEnv, master,
+        serializerManager, conf, memoryManager, mapOutputTracker,
+        shuffleManager, transfer, securityMgr, 0)
+      try {
+        blockManager.initialize("app-id")
+        testPutBlockDataAsStream(blockManager, storageLevel)
+      } finally {
+        blockManager.stop()
+      }
+    }
+  }
+
   test("turn off updated block statuses") {
     val conf = new SparkConf()
     conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to