Repository: spark
Updated Branches:
  refs/heads/master 017cdf2be -> e52e597db


[SPARK-13659] Refactor BlockStore put*() APIs to remove returnValues

In preparation for larger refactoring, this patch removes the confusing 
`returnValues` option from the BlockStore put() APIs: returning the value is 
only useful in one place (caching) and in other situations, such as block 
replication, it's simpler to put() and then get().

As part of this change, I needed to refactor `BlockManager.doPut()`'s block 
replication code. I also changed `doPut()` to access the memory and disk stores 
directly rather than calling them through the BlockStore interface; this is in 
anticipation of a followup patch to remove the BlockStore interface so that the 
disk store can expose a binary-data-oriented API which is not concerned with 
Java objects or serialization.

These changes should be covered by the existing storage unit tests. The best 
way to review this patch is probably to look at the individual commits, all of 
which are small and have useful descriptions to guide the review.

/cc davies for review.

Author: Josh Rosen <joshro...@databricks.com>

Closes #11502 from JoshRosen/remove-returnvalues.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e52e597d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e52e597d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e52e597d

Branch: refs/heads/master
Commit: e52e597db48d069b98c1d404b221d3365f38fbb8
Parents: 017cdf2
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Mar 7 21:50:01 2016 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Mar 7 21:50:01 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 338 ++++++++++---------
 .../org/apache/spark/storage/BlockStore.scala   |  16 +-
 .../org/apache/spark/storage/DiskStore.scala    |  19 +-
 .../org/apache/spark/storage/MemoryStore.scala  |  39 +--
 .../org/apache/spark/storage/PutResult.scala    |  32 --
 .../spark/storage/BlockManagerSuite.scala       |  52 ++-
 6 files changed, 223 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e52e597d/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b59191b..dcf359e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -52,6 +52,12 @@ private[spark] class BlockResult(
     val readMethod: DataReadMethod.Value,
     val bytes: Long)
 
+// Class for representing return value of doPut()
+private sealed trait DoPutResult
+private case object DoPutSucceeded extends DoPutResult
+private case object DoPutBytesFailed extends DoPutResult
+private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
+
 /**
  * Manager running on every node (driver and executors) which provides 
interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, 
disk, and off-heap).
@@ -432,98 +438,108 @@ private[spark] class BlockManager(
         logDebug(s"Block $blockId was not found")
         None
       case Some(info) =>
-        val level = info.level
-        logDebug(s"Level for block $blockId is $level")
-
-        // Look for the block in memory
-        if (level.useMemory) {
-          logDebug(s"Getting block $blockId from memory")
-          val result = if (asBlockResult) {
-            memoryStore.getValues(blockId).map { iter =>
-              val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
-              new BlockResult(ci, DataReadMethod.Memory, info.size)
-            }
-          } else {
-            memoryStore.getBytes(blockId)
-          }
-          result match {
-            case Some(values) =>
-              return result
-            case None =>
-              logDebug(s"Block $blockId not found in memory")
-          }
+        doGetLocal(blockId, info, asBlockResult)
+    }
+  }
+
+  /**
+   * Get a local block from the block manager.
+   * Assumes that the caller holds a read lock on the block.
+   */
+  private def doGetLocal(
+      blockId: BlockId,
+      info: BlockInfo,
+      asBlockResult: Boolean): Option[Any] = {
+    val level = info.level
+    logDebug(s"Level for block $blockId is $level")
+
+    // Look for the block in memory
+    if (level.useMemory) {
+      logDebug(s"Getting block $blockId from memory")
+      val result = if (asBlockResult) {
+        memoryStore.getValues(blockId).map { iter =>
+          val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
+          new BlockResult(ci, DataReadMethod.Memory, info.size)
         }
+      } else {
+        memoryStore.getBytes(blockId)
+      }
+      result match {
+        case Some(values) =>
+          return result
+        case None =>
+          logDebug(s"Block $blockId not found in memory")
+      }
+    }
 
-        // Look for block on disk, potentially storing it back in memory if 
required
-        if (level.useDisk) {
-          logDebug(s"Getting block $blockId from disk")
-          val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
-            case Some(b) => b
-            case None =>
-              releaseLock(blockId)
-              throw new BlockException(
-                blockId, s"Block $blockId not found on disk, though it should 
be")
-          }
-          assert(0 == bytes.position())
-
-          if (!level.useMemory) {
-            // If the block shouldn't be stored in memory, we can just return 
it
-            if (asBlockResult) {
-              val iter = CompletionIterator[Any, Iterator[Any]](
-                dataDeserialize(blockId, bytes), releaseLock(blockId))
-              return Some(new BlockResult(iter, DataReadMethod.Disk, 
info.size))
-            } else {
-              return Some(bytes)
-            }
-          } else {
-            // Otherwise, we also have to store something in the memory store
-            if (!level.deserialized || !asBlockResult) {
-              /* We'll store the bytes in memory if the block's storage level 
includes
-               * "memory serialized", or if it should be cached as objects in 
memory
-               * but we only requested its serialized bytes. */
-              memoryStore.putBytes(blockId, bytes.limit, () => {
-                // https://issues.apache.org/jira/browse/SPARK-6076
-                // If the file size is bigger than the free memory, OOM will 
happen. So if we cannot
-                // put it into MemoryStore, copyForMemory should not be 
created. That's why this
-                // action is put into a `() => ByteBuffer` and created lazily.
-                val copyForMemory = ByteBuffer.allocate(bytes.limit)
-                copyForMemory.put(bytes)
-              })
-              bytes.rewind()
-            }
-            if (!asBlockResult) {
-              return Some(bytes)
-            } else {
-              val values = dataDeserialize(blockId, bytes)
-              if (level.deserialized) {
-                // Cache the values before returning them
-                val putResult = memoryStore.putIterator(
-                  blockId, values, level, returnValues = true, 
allowPersistToDisk = false)
-                // The put may or may not have succeeded, depending on whether 
there was enough
-                // space to unroll the block. Either way, the put here should 
return an iterator.
-                putResult.data match {
-                  case Left(it) =>
-                    val ci = CompletionIterator[Any, Iterator[Any]](it, 
releaseLock(blockId))
-                    return Some(new BlockResult(ci, DataReadMethod.Disk, 
info.size))
-                  case _ =>
-                    // This only happens if we dropped the values back to disk 
(which is never)
-                    throw new SparkException("Memory store did not return an 
iterator!")
-                }
-              } else {
-                val ci = CompletionIterator[Any, Iterator[Any]](values, 
releaseLock(blockId))
-                return Some(new BlockResult(ci, DataReadMethod.Disk, 
info.size))
+    // Look for block on disk, potentially storing it back in memory if 
required
+    if (level.useDisk) {
+      logDebug(s"Getting block $blockId from disk")
+      val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
+        case Some(b) => b
+        case None =>
+          releaseLock(blockId)
+          throw new BlockException(
+            blockId, s"Block $blockId not found on disk, though it should be")
+      }
+      assert(0 == bytes.position())
+
+      if (!level.useMemory) {
+        // If the block shouldn't be stored in memory, we can just return it
+        if (asBlockResult) {
+          val iter = CompletionIterator[Any, Iterator[Any]](
+            dataDeserialize(blockId, bytes), releaseLock(blockId))
+          return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
+        } else {
+          return Some(bytes)
+        }
+      } else {
+        // Otherwise, we also have to store something in the memory store
+        if (!level.deserialized || !asBlockResult) {
+          /* We'll store the bytes in memory if the block's storage level 
includes
+           * "memory serialized", or if it should be cached as objects in 
memory
+           * but we only requested its serialized bytes. */
+          memoryStore.putBytes(blockId, bytes.limit, () => {
+            // https://issues.apache.org/jira/browse/SPARK-6076
+            // If the file size is bigger than the free memory, OOM will 
happen. So if we cannot
+            // put it into MemoryStore, copyForMemory should not be created. 
That's why this
+            // action is put into a `() => ByteBuffer` and created lazily.
+            val copyForMemory = ByteBuffer.allocate(bytes.limit)
+            copyForMemory.put(bytes)
+          })
+          bytes.rewind()
+        }
+        if (!asBlockResult) {
+          return Some(bytes)
+        } else {
+          val values = dataDeserialize(blockId, bytes)
+          val valuesToReturn: Iterator[Any] = {
+            if (level.deserialized) {
+              // Cache the values before returning them
+              memoryStore.putIterator(blockId, values, level, 
allowPersistToDisk = false) match {
+                case Left(iter) =>
+                  // The memory store put() failed, so it returned the 
iterator back to us:
+                  iter
+                case Right(_) =>
+                  // The put() succeeded, so we can read the values back:
+                  memoryStore.getValues(blockId).get
               }
+            } else {
+              values
             }
           }
-        } else {
-          // This branch represents a case where the BlockInfoManager 
contained an entry for
-          // the block but the block could not be found in any of the block 
stores. This case
-          // should never occur, but for completeness's sake we address it 
here.
-          logError(
-            s"Block $blockId is supposedly stored locally but was not found in 
any block store")
-          releaseLock(blockId)
-          None
+          val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, 
releaseLock(blockId))
+          return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         }
+      }
+    } else {
+      // This branch represents a case where the BlockInfoManager contained an 
entry for
+      // the block but the block could not be found in any of the block 
stores. This case
+      // should never occur, but for completeness's sake we address it here.
+      logError(
+        s"Block $blockId is supposedly stored locally but was not found in any 
block store")
+      releaseLock(blockId)
+      None
     }
   }
 
@@ -659,7 +675,7 @@ private[spark] class BlockManager(
       makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = 
{
     // Initially we hold no locks on this block.
     doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) 
match {
-      case None =>
+      case DoPutSucceeded =>
         // doPut() didn't hand work back to us, so the block already existed 
or was successfully
         // stored. Therefore, we now hold a read lock on the block.
         val blockResult = get(blockId).getOrElse {
@@ -669,11 +685,13 @@ private[spark] class BlockManager(
           throw new SparkException(s"get() failed for block $blockId even 
though we held a lock")
         }
         Left(blockResult)
-      case Some(failedPutResult) =>
+      case DoPutIteratorFailed(iter) =>
         // The put failed, likely because the data was too large to fit in 
memory and could not be
         // dropped to disk. Therefore, we need to pass the input iterator back 
to the caller so
         // that they can decide what to do with the values (e.g. process them 
without caching).
-       Right(failedPutResult.data.left.get)
+       Right(iter)
+      case DoPutBytesFailed =>
+        throw new SparkException("doPut returned an invalid failure response")
     }
   }
 
@@ -687,7 +705,13 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(values != null, "Values is null")
-    doPut(blockId, IteratorValues(() => values), level, tellMaster, 
effectiveStorageLevel).isEmpty
+    val result = doPut(
+      blockId,
+      IteratorValues(() => values),
+      level,
+      tellMaster,
+      effectiveStorageLevel)
+    result == DoPutSucceeded
   }
 
   /**
@@ -719,7 +743,8 @@ private[spark] class BlockManager(
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
     require(bytes != null, "Bytes is null")
-    doPut(blockId, ByteBufferValues(bytes), level, tellMaster, 
effectiveStorageLevel).isEmpty
+    val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, 
effectiveStorageLevel)
+    result == DoPutSucceeded
   }
 
   /**
@@ -734,9 +759,9 @@ private[spark] class BlockManager(
    * @param keepReadLock if true, this method will hold the read lock when it 
returns (even if the
    *                     block already exists). If false, this method will 
hold no locks when it
    *                     returns.
-   * @return `Some(PutResult)` if the block did not exist and could not be 
successfully cached,
-   *         or None if the block already existed or was successfully stored 
(fully consuming
-   *         the input data / input iterator).
+   * @return [[DoPutSucceeded]] if the block was already present or if the put 
succeeded, or
+   *        [[DoPutBytesFailed]] if the put failed and we were storing bytes, 
or
+   *        [[DoPutIteratorFailed]] if the put failed and we were storing an 
iterator.
    */
   private def doPut(
       blockId: BlockId,
@@ -744,7 +769,7 @@ private[spark] class BlockManager(
       level: StorageLevel,
       tellMaster: Boolean = true,
       effectiveStorageLevel: Option[StorageLevel] = None,
-      keepReadLock: Boolean = false): Option[PutResult] = {
+      keepReadLock: Boolean = false): DoPutResult = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -765,21 +790,12 @@ private[spark] class BlockManager(
           // lockNewBlockForWriting returned a read lock on the existing 
block, so we must free it:
           releaseLock(blockId)
         }
-        return None
+        return DoPutSucceeded
       }
     }
 
     val startTimeMs = System.currentTimeMillis
 
-    /* If we're storing values and we need to replicate the data, we'll want 
access to the values,
-     * but because our put will read the whole iterator, there will be no 
values left. For the
-     * case where the put serializes data, we'll remember the bytes, above; 
but for the case where
-     * it doesn't, such as deserialized storage, let's rely on the put 
returning an Iterator. */
-    var valuesAfterPut: Iterator[Any] = null
-
-    // Ditto for the bytes after the put
-    var bytesAfterPut: ByteBuffer = null
-
     // Size of the block in bytes
     var size = 0L
 
@@ -801,43 +817,46 @@ private[spark] class BlockManager(
     }
 
     var blockWasSuccessfullyStored = false
-    var result: PutResult = null
+    var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
 
     putBlockInfo.synchronized {
       logTrace("Put for block %s took %s to get into synchronized block"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
 
       try {
-        // returnValues - Whether to return the values put
-        // blockStore - The type of storage to put these values into
-        val (returnValues, blockStore: BlockStore) = {
-          if (putLevel.useMemory) {
-            // Put it in memory first, even if it also has useDisk set to true;
-            // We will drop it to disk later if the memory store can't hold it.
-            (true, memoryStore)
-          } else if (putLevel.useDisk) {
-            // Don't get back the bytes from put unless we replicate them
-            (putLevel.replication > 1, diskStore)
-          } else {
-            assert(putLevel == StorageLevel.NONE)
-            throw new BlockException(
-              blockId, s"Attempted to put block $blockId without specifying 
storage level!")
+        if (putLevel.useMemory) {
+          // Put it in memory first, even if it also has useDisk set to true;
+          // We will drop it to disk later if the memory store can't hold it.
+          data match {
+            case IteratorValues(iterator) =>
+              memoryStore.putIterator(blockId, iterator(), putLevel) match {
+                case Right(s) =>
+                  size = s
+                case Left(iter) =>
+                  iteratorFromFailedMemoryStorePut = Some(iter)
+              }
+            case ByteBufferValues(bytes) =>
+              bytes.rewind()
+              size = bytes.limit()
+              memoryStore.putBytes(blockId, bytes, putLevel)
           }
-        }
-
-        // Actually put the values
-        result = data match {
-          case IteratorValues(iterator) =>
-            blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
-          case ByteBufferValues(bytes) =>
-            bytes.rewind()
-            blockStore.putBytes(blockId, bytes, putLevel)
-        }
-        size = result.size
-        result.data match {
-          case Left (newIterator) if putLevel.useMemory => valuesAfterPut = 
newIterator
-          case Right (newBytes) => bytesAfterPut = newBytes
-          case _ =>
+        } else if (putLevel.useDisk) {
+          data match {
+            case IteratorValues(iterator) =>
+              diskStore.putIterator(blockId, iterator(), putLevel) match {
+                case Right(s) =>
+                  size = s
+                // putIterator() will never return Left (see its return type).
+              }
+            case ByteBufferValues(bytes) =>
+              bytes.rewind()
+              size = bytes.limit()
+              diskStore.putBytes(blockId, bytes, putLevel)
+          }
+        } else {
+          assert(putLevel == StorageLevel.NONE)
+          throw new BlockException(
+            blockId, s"Attempted to put block $blockId without specifying 
storage level!")
         }
 
         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
@@ -868,34 +887,27 @@ private[spark] class BlockManager(
     }
     logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
 
-    // Either we're storing bytes and we asynchronously started replication, 
or we're storing
-    // values and need to serialize and replicate them now:
-    if (putLevel.replication > 1) {
-      data match {
-        case ByteBufferValues(bytes) =>
-          if (replicationFuture != null) {
-            Await.ready(replicationFuture, Duration.Inf)
-          }
-        case _ =>
-          if (blockWasSuccessfullyStored) {
-            val remoteStartTime = System.currentTimeMillis
-            // Serialize the block if not already done
-            if (bytesAfterPut == null) {
-              if (valuesAfterPut == null) {
-                throw new SparkException(
-                  "Underlying put returned neither an Iterator nor bytes! This 
shouldn't happen.")
-              }
-              bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
-            }
-            replicate(blockId, bytesAfterPut, putLevel)
-            logDebug("Put block %s remotely took %s"
-              .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+    if (replicationFuture != null) {
+      // Wait for asynchronous replication to finish
+      Await.ready(replicationFuture, Duration.Inf)
+    } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
+      val remoteStartTime = System.currentTimeMillis
+      val bytesToReplicate: ByteBuffer = {
+        doGetLocal(blockId, putBlockInfo, asBlockResult = false)
+          .map(_.asInstanceOf[ByteBuffer])
+          .getOrElse {
+            throw new SparkException(s"Block $blockId was not found even 
though it was just stored")
           }
       }
+      try {
+        replicate(blockId, bytesToReplicate, putLevel)
+      } finally {
+        BlockManager.dispose(bytesToReplicate)
+      }
+      logDebug("Put block %s remotely took %s"
+        .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
     }
 
-    BlockManager.dispose(bytesAfterPut)
-
     if (putLevel.replication > 1) {
       logDebug("Putting block %s with replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
@@ -905,9 +917,11 @@ private[spark] class BlockManager(
     }
 
     if (blockWasSuccessfullyStored) {
-      None
+      DoPutSucceeded
+    } else if (iteratorFromFailedMemoryStorePut.isDefined) {
+      DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
     } else {
-      Some(result)
+      DoPutBytesFailed
     }
   }
 
@@ -1064,7 +1078,7 @@ private[spark] class BlockManager(
       logInfo(s"Writing block $blockId to disk")
       data() match {
         case Left(elements) =>
-          diskStore.putIterator(blockId, elements.toIterator, level, 
returnValues = false)
+          diskStore.putIterator(blockId, elements.toIterator, level)
         case Right(bytes) =>
           diskStore.putBytes(blockId, bytes, level)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/e52e597d/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index d3af50d..b069918 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -26,20 +26,18 @@ import org.apache.spark.Logging
  */
 private[spark] abstract class BlockStore(val blockManager: BlockManager) 
extends Logging {
 
-  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): 
PutResult
+  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
 
   /**
-   * Put in a block and, possibly, also return its content as either bytes or 
another Iterator.
-   * This is used to efficiently write the values to multiple locations (e.g. 
for replication).
+   * Attempt to store an iterator of values.
    *
-   * @return a PutResult that contains the size of the data, as well as the 
values put if
-   *         returnValues is true (if not, the result's data field can be null)
+   * @return an iterator of values (in case the put failed), or the estimated 
size of the stored
+   *         values if the put succeeded.
    */
   def putIterator(
-    blockId: BlockId,
-    values: Iterator[Any],
-    level: StorageLevel,
-    returnValues: Boolean): PutResult
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel): Either[Iterator[Any], Long]
 
   /**
    * Return the size of a block in bytes.

http://git-wip-us.apache.org/repos/asf/spark/blob/e52e597d/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index db12a4a..e35aa1b 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -36,7 +36,7 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
     diskManager.getFile(blockId.name).length
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel): PutResult = {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel): Unit = {
     // So that we do not modify the input offsets !
     // duplicate does not copy buffer, so inexpensive
     val bytes = _bytes.duplicate()
@@ -54,15 +54,12 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
       file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
-    PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-
+      level: StorageLevel): Right[Iterator[Any], Long] = {
     logDebug(s"Attempting to write values for block $blockId")
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
@@ -90,13 +87,7 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
     logDebug("Block %s stored as %s file on disk in %d ms".format(
       file.getName, Utils.bytesToString(length), timeTaken))
 
-    if (returnValues) {
-      // Return a byte buffer for the contents of the file
-      val buffer = getBytes(blockId).get
-      PutResult(length, Right(buffer))
-    } else {
-      PutResult(length, null)
-    }
+    Right(length)
   }
 
   private def getBytes(file: File, offset: Long, length: Long): 
Option[ByteBuffer] = {
@@ -127,10 +118,6 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
     getBytes(file, 0, file.length)
   }
 
-  def getBytes(segment: FileSegment): Option[ByteBuffer] = {
-    getBytes(segment.file, segment.offset, segment.length)
-  }
-
   override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
     getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, 
buffer))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e52e597d/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 317d73a..12b70d1 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -87,16 +87,15 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
     }
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel): PutResult = {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel): Unit = {
     // Work on a duplicate - since the original input might be used elsewhere.
     val bytes = _bytes.duplicate()
     bytes.rewind()
     if (level.deserialized) {
       val values = blockManager.dataDeserialize(blockId, bytes)
-      putIterator(blockId, values, level, returnValues = true)
+      putIterator(blockId, values, level)
     } else {
       tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-      PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
 
@@ -106,26 +105,20 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
    *
    * The caller should guarantee that `size` is correct.
    */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): 
PutResult = {
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = 
{
     // Work on a duplicate - since the original input might be used elsewhere.
     lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
     val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
-    val data =
-      if (putSuccess) {
-        assert(bytes.limit == size)
-        Right(bytes.duplicate())
-      } else {
-        null
-      }
-    PutResult(size, data)
+    if (putSuccess) {
+      assert(bytes.limit == size)
+    }
   }
 
   override def putIterator(
       blockId: BlockId,
       values: Iterator[Any],
-      level: StorageLevel,
-      returnValues: Boolean): PutResult = {
-    putIterator(blockId, values, level, returnValues, allowPersistToDisk = 
true)
+      level: StorageLevel): Either[Iterator[Any], Long] = {
+    putIterator(blockId, values, level, allowPersistToDisk = true)
   }
 
   /**
@@ -144,32 +137,30 @@ private[spark] class MemoryStore(blockManager: 
BlockManager, memoryManager: Memo
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      returnValues: Boolean,
-      allowPersistToDisk: Boolean): PutResult = {
+      allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
     val unrolledValues = unrollSafely(blockId, values)
     unrolledValues match {
       case Left(arrayValues) =>
         // Values are fully unrolled in memory, so store them as an array
-        val res = {
+        val size = {
           if (level.deserialized) {
             val sizeEstimate = 
SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
             tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = 
true)
-            PutResult(sizeEstimate, Left(arrayValues.iterator))
+            sizeEstimate
           } else {
             val bytes = blockManager.dataSerialize(blockId, 
arrayValues.iterator)
             tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-            PutResult(bytes.limit(), Right(bytes.duplicate()))
+            bytes.limit()
           }
         }
-        PutResult(res.size, res.data)
+        Right(size)
       case Right(iteratorValues) =>
         // Not enough space to unroll this block; drop to disk if applicable
         if (level.useDisk && allowPersistToDisk) {
           logWarning(s"Persisting block $blockId to disk instead.")
-          val res = blockManager.diskStore.putIterator(blockId, 
iteratorValues, level, returnValues)
-          PutResult(res.size, res.data)
+          blockManager.diskStore.putIterator(blockId, iteratorValues, level)
         } else {
-          PutResult(0, Left(iteratorValues))
+          Left(iteratorValues)
         }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e52e597d/core/src/main/scala/org/apache/spark/storage/PutResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/PutResult.scala 
b/core/src/main/scala/org/apache/spark/storage/PutResult.scala
deleted file mode 100644
index f0eac75..0000000
--- a/core/src/main/scala/org/apache/spark/storage/PutResult.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-/**
- * Result of adding a block into a BlockStore. This case class contains a few 
things:
- *   (1) The estimated size of the put,
- *   (2) The values put if the caller asked for them to be returned (e.g. for 
chaining
- *       replication), and
- *   (3) A list of blocks dropped as a result of this put. This is always 
empty for DiskStore.
- */
-private[spark] case class PutResult(
-    size: Long,
-    data: Either[Iterator[_], ByteBuffer],
-    droppedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/e52e597d/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 89b4270..cfcbf17 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1156,14 +1156,12 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll with plenty of space. This should succeed and cache both blocks.
-    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, 
returnValues = true)
-    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, 
returnValues = true)
+    val result1 = memoryStore.putIterator("b1", smallIterator, memOnly)
+    val result2 = memoryStore.putIterator("b2", smallIterator, memOnly)
     assert(memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
-    assert(result1.size > 0) // unroll was successful
-    assert(result2.size > 0)
-    assert(result1.data.isLeft) // unroll did not drop this block to disk
-    assert(result2.data.isLeft)
+    assert(result1.isRight) // unroll was successful
+    assert(result2.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Re-put these two blocks so block manager knows about them too. 
Otherwise, block manager
@@ -1174,9 +1172,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     store.putIterator("b2", smallIterator, memOnly)
 
     // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
-    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, 
returnValues = true)
-    assert(result3.size > 0)
-    assert(result3.data.isLeft)
+    val result3 = memoryStore.putIterator("b3", smallIterator, memOnly)
+    assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1185,9 +1182,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     store.putIterator("b3", smallIterator, memOnly)
 
     // Unroll huge block with not enough space. This should fail and kick out 
b2 in the process.
-    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, 
returnValues = true)
-    assert(result4.size === 0) // unroll was unsuccessful
-    assert(result4.data.isLeft)
+    val result4 = memoryStore.putIterator("b4", bigIterator, memOnly)
+    assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1214,8 +1210,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
     // Unroll with not enough space. This should succeed but kick out b1 in 
the process.
     // Memory store should contain b2 and b3, while disk store should contain 
only b1
-    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, 
returnValues = true)
-    assert(result3.size > 0)
+    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk)
+    assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1229,9 +1225,8 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     // Unroll huge block with not enough space. This should fail and drop the 
new block to disk
     // directly in addition to kicking out b2 in the process. Memory store 
should contain only
     // b3, while disk store should contain b1, b2 and b4.
-    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, 
returnValues = true)
-    assert(result4.size > 0)
-    assert(result4.data.isRight) // unroll returned bytes from disk
+    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk)
+    assert(result4.isRight)
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
@@ -1252,28 +1247,28 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // All unroll memory used is released because unrollSafely returned an 
array
-    memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b1", smallIterator, memOnly)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-    memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b2", smallIterator, memOnly)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll memory is not released because unrollSafely returned an iterator
     // that still depends on the underlying vector used in the process
-    memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b3", smallIterator, memOnly)
     val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB3 > 0)
 
     // The unroll memory owned by this thread builds on top of its value after 
the previous unrolls
-    memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b4", smallIterator, memOnly)
     val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
 
     // ... but only to a certain extent (until we run out of free space to 
grant new unroll memory)
-    memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b5", smallIterator, memOnly)
     val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
-    memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b6", smallIterator, memOnly)
     val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
-    memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+    memoryStore.putIterator("b7", smallIterator, memOnly)
     val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
@@ -1286,11 +1281,9 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val blockId = BlockId("rdd_3_10")
     store.blockInfoManager.lockNewBlockForWriting(
       blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, tellMaster = false))
-    val result = memoryStore.putBytes(blockId, 13000, () => {
+    memoryStore.putBytes(blockId, 13000, () => {
       fail("A big ByteBuffer that cannot be put into MemoryStore should not be 
created")
     })
-    assert(result.size === 13000)
-    assert(result.data === null)
   }
 
   test("put a small ByteBuffer to MemoryStore") {
@@ -1298,12 +1291,11 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
     var bytes: ByteBuffer = null
-    val result = memoryStore.putBytes(blockId, 10000, () => {
+    memoryStore.putBytes(blockId, 10000, () => {
       bytes = ByteBuffer.allocate(10000)
       bytes
     })
-    assert(result.size === 10000)
-    assert(result.data === Right(bytes))
+    assert(memoryStore.getSize(blockId) === 10000)
   }
 
   test("read-locked blocks cannot be evicted from the MemoryStore") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to