Repository: spark
Updated Branches:
  refs/heads/master 3d3acef04 -> 40566e10a


SPARK-942: Do not materialize partitions when DISK_ONLY storage level is used

This is a port of a pull request original targeted at incubator-spark: 
https://github.com/apache/incubator-spark/pull/180

Essentially if a user returns a generative iterator (from a flatMap operation), 
when trying to persist the data, Spark would first unroll the iterator into an 
ArrayBuffer, and then try to figure out if it could store the data. In cases 
where the user provided an iterator that generated more data then available 
memory, this would case a crash. With this patch, if the user requests a 
persist with a 'StorageLevel.DISK_ONLY', the iterator will be unrolled as it is 
inputed into the serializer.

To do this, two changes where made:
1) The type of the 'values' argument in the putValues method of the BlockStore 
interface was changed from ArrayBuffer to Iterator (and all code interfacing 
with this method was modified to connect correctly.
2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every 
1000 objects. This was done because the ObjectOutputStream caches objects (thus 
preventing them from being GC'd) to write more compact serialization. If reset 
is never called, eventually the memory fills up, if it is called too often then 
the serialization streams become much larger because of redundant class 
descriptions.

Author: Kyle Ellrott <kellr...@gmail.com>

Closes #50 from kellrott/iterator-to-disk and squashes the following commits:

9ef7cb8 [Kyle Ellrott] Fixing formatting issues.
60e0c57 [Kyle Ellrott] Fixing issues (formatting, variable names, etc.) from 
review comments
8aa31cd [Kyle Ellrott] Merge ../incubator-spark into iterator-to-disk
33ac390 [Kyle Ellrott] Merge branch 'iterator-to-disk' of 
github.com:kellrott/incubator-spark into iterator-to-disk
2f684ea [Kyle Ellrott] Refactoring the BlockManager to replace the 
Either[Either[A,B]] usage. Now using trait 'Values'. Also modified 
BlockStore.putBytes call to return PutResult, so that it behaves like putValues.
f70d069 [Kyle Ellrott] Adding docs for spark.serializer.objectStreamReset 
configuration
7ccc74b [Kyle Ellrott] Moving the 'LargeIteratorSuite' to simply test 
persistance of iterators. It doesn't try to invoke a OOM error any more
16a4cea [Kyle Ellrott] Streamlined the LargeIteratorSuite unit test. It should 
now run in ~25 seconds. Confirmed that it still crashes an unpatched copy of 
Spark.
c2fb430 [Kyle Ellrott] Removing more un-needed array-buffer to iterator 
conversions
627a8b7 [Kyle Ellrott] Wrapping a few long lines
0f28ec7 [Kyle Ellrott] Adding second putValues to BlockStore interface that 
accepts an ArrayBuffer (rather then an Iterator). This will allow BlockStores 
to have slightly different behaviors dependent on whether they get an Iterator 
or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache 
an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the 
duplication.
656c33e [Kyle Ellrott] Fixing the JavaSerializer to read from the SparkConf 
rather then the System property.
8644ee8 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
00c98e0 [Kyle Ellrott] Making the Java ObjectStreamSerializer reset rate 
configurable by the system variable 'spark.serializer.objectStreamReset', 
default is not 10000.
40fe1d7 [Kyle Ellrott] Removing rouge space
31fe08e [Kyle Ellrott] Removing un-needed semi-colons
9df0276 [Kyle Ellrott] Added check to make sure that streamed-to-dist RDD 
actually returns good data in the LargeIteratorSuite
a6424ba [Kyle Ellrott] Wrapping long line
2eeda75 [Kyle Ellrott] Fixing dumb mistake ("||" instead of "&&")
0e6f808 [Kyle Ellrott] Deleting temp output directory when done
95c7f67 [Kyle Ellrott] Simplifying StorageLevel checks
56f71cd [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
44ec35a [Kyle Ellrott] Adding some comments.
5eb2b7e [Kyle Ellrott] Changing the JavaSerializer reset to occur every 1000 
objects.
f403826 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
81d670c [Kyle Ellrott] Adding unit test for straight to disk iterator methods.
d32992f [Kyle Ellrott] Merge remote-tracking branch 'origin/master' into 
iterator-to-disk
cac1fad [Kyle Ellrott] Fixing MemoryStore, so that it converts incoming 
iterators to ArrayBuffer objects. This was previously done higher up the stack.
efe1102 [Kyle Ellrott] Changing CacheManager and BlockManager to pass iterators 
directly to the serializer when a 'DISK_ONLY' persist is called. This is in 
response to SPARK-942.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40566e10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40566e10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40566e10

Branch: refs/heads/master
Commit: 40566e10aae4b21ffc71ea72702b8df118ac5c8e
Parents: 3d3acef
Author: Kyle Ellrott <kellr...@gmail.com>
Authored: Thu Mar 6 14:51:00 2014 -0800
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Mar 6 14:51:19 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/CacheManager.scala   | 28 ++++++-
 .../spark/serializer/JavaSerializer.scala       | 29 +++++--
 .../org/apache/spark/storage/BlockManager.scala | 87 ++++++++++++--------
 .../org/apache/spark/storage/BlockStore.scala   |  5 +-
 .../org/apache/spark/storage/DiskStore.scala    | 14 +++-
 .../org/apache/spark/storage/MemoryStore.scala  | 31 +++++--
 .../spark/storage/FlatmapIteratorSuite.scala    | 74 +++++++++++++++++
 docs/configuration.md                           | 11 +++
 8 files changed, 226 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala 
b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 1daabec..872e892 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -71,10 +71,30 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
           val computedValues = rdd.computeOrReadCheckpoint(split, context)
           // Persist the result, so long as the task is not running locally
           if (context.runningLocally) { return computedValues }
-          val elements = new ArrayBuffer[Any]
-          elements ++= computedValues
-          blockManager.put(key, elements, storageLevel, tellMaster = true)
-          elements.iterator.asInstanceOf[Iterator[T]]
+          if (storageLevel.useDisk && !storageLevel.useMemory) {
+            // In the case that this RDD is to be persisted using DISK_ONLY
+            // the iterator will be passed directly to the blockManager 
(rather then
+            // caching it to an ArrayBuffer first), then the resulting block 
data iterator
+            // will be passed back to the user. If the iterator generates a 
lot of data,
+            // this means that it doesn't all have to be held in memory at one 
time.
+            // This could also apply to MEMORY_ONLY_SER storage, but we need 
to make sure
+            // blocks aren't dropped by the block store before enabling that.
+            blockManager.put(key, computedValues, storageLevel, tellMaster = 
true)
+            return blockManager.get(key) match {
+              case Some(values) =>
+                return new InterruptibleIterator(context, 
values.asInstanceOf[Iterator[T]])
+              case None =>
+                logInfo("Failure to store %s".format(key))
+                throw new Exception("Block manager failed to return persisted 
valued")
+            }
+          } else {
+            // In this case the RDD is cached to an array buffer. This will 
save the results
+            // if we're dealing with a 'one-time' iterator
+            val elements = new ArrayBuffer[Any]
+            elements ++= computedValues
+            blockManager.put(key, elements, storageLevel, tellMaster = true)
+            return elements.iterator.asInstanceOf[Iterator[T]]
+          }
         } finally {
           loading.synchronized {
             loading.remove(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 33c1705..bfa647f 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -23,9 +23,28 @@ import java.nio.ByteBuffer
 import org.apache.spark.SparkConf
 import org.apache.spark.util.ByteBufferInputStream
 
-private[spark] class JavaSerializationStream(out: OutputStream) extends 
SerializationStream {
+private[spark] class JavaSerializationStream(out: OutputStream, conf: 
SparkConf)
+  extends SerializationStream {
   val objOut = new ObjectOutputStream(out)
-  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); 
this }
+  var counter = 0
+  val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000)
+
+  /**
+   * Calling reset to avoid memory leak:
+   * 
http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
+   * But only call it every 10,000th time to avoid bloated serialization 
streams (when
+   * the stream 'resets' object class descriptions have to be re-written)
+   */
+  def writeObject[T](t: T): SerializationStream = {
+    objOut.writeObject(t)
+    if (counterReset > 0 && counter >= counterReset) {
+      objOut.reset()
+      counter = 0
+    } else {
+      counter += 1
+    }
+    this
+  }
   def flush() { objOut.flush() }
   def close() { objOut.close() }
 }
@@ -41,7 +60,7 @@ extends DeserializationStream {
   def close() { objIn.close() }
 }
 
-private[spark] class JavaSerializerInstance extends SerializerInstance {
+private[spark] class JavaSerializerInstance(conf: SparkConf) extends 
SerializerInstance {
   def serialize[T](t: T): ByteBuffer = {
     val bos = new ByteArrayOutputStream()
     val out = serializeStream(bos)
@@ -63,7 +82,7 @@ private[spark] class JavaSerializerInstance extends 
SerializerInstance {
   }
 
   def serializeStream(s: OutputStream): SerializationStream = {
-    new JavaSerializationStream(s)
+    new JavaSerializationStream(s, conf)
   }
 
   def deserializeStream(s: InputStream): DeserializationStream = {
@@ -79,5 +98,5 @@ private[spark] class JavaSerializerInstance extends 
SerializerInstance {
  * A Spark serializer that uses Java's built-in serialization.
  */
 class JavaSerializer(conf: SparkConf) extends Serializer {
-  def newInstance(): SerializerInstance = new JavaSerializerInstance
+  def newInstance(): SerializerInstance = new JavaSerializerInstance(conf)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a734ddc..977c246 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -35,6 +35,12 @@ import org.apache.spark.network._
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.util._
 
+sealed trait Values
+
+case class ByteBufferValues(buffer: ByteBuffer) extends Values
+case class IteratorValues(iterator: Iterator[Any]) extends Values
+case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values
+
 private[spark] class BlockManager(
     executorId: String,
     actorSystem: ActorSystem,
@@ -455,9 +461,7 @@ private[spark] class BlockManager(
 
   def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, 
tellMaster: Boolean)
     : Long = {
-    val elements = new ArrayBuffer[Any]
-    elements ++= values
-    put(blockId, elements, level, tellMaster)
+    doPut(blockId, IteratorValues(values), level, tellMaster)
   }
 
   /**
@@ -479,7 +483,7 @@ private[spark] class BlockManager(
   def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
           tellMaster: Boolean = true) : Long = {
     require(values != null, "Values is null")
-    doPut(blockId, Left(values), level, tellMaster)
+    doPut(blockId, ArrayBufferValues(values), level, tellMaster)
   }
 
   /**
@@ -488,10 +492,11 @@ private[spark] class BlockManager(
   def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
                tellMaster: Boolean = true) {
     require(bytes != null, "Bytes is null")
-    doPut(blockId, Right(bytes), level, tellMaster)
+    doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
   }
 
-  private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], 
ByteBuffer],
+  private def doPut(blockId: BlockId,
+                    data: Values,
                     level: StorageLevel, tellMaster: Boolean = true): Long = {
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
@@ -534,8 +539,9 @@ private[spark] class BlockManager(
 
     // If we're storing bytes, then initiate the replication before storing 
them locally.
     // This is faster as data is already serialized and ready to send.
-    val replicationFuture = if (data.isRight && level.replication > 1) {
-      val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, 
just creates a wrapper
+    val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && 
level.replication > 1) {
+      // Duplicate doesn't copy the bytes, just creates a wrapper
+      val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
       Future {
         replicate(blockId, bufferView, level)
       }
@@ -549,34 +555,43 @@ private[spark] class BlockManager(
 
       var marked = false
       try {
-        data match {
-          case Left(values) => {
-            if (level.useMemory) {
-              // Save it just to memory first, even if it also has useDisk set 
to true; we will
-              // drop it to disk later if the memory store can't hold it.
-              val res = memoryStore.putValues(blockId, values, level, true)
-              size = res.size
-              res.data match {
-                case Right(newBytes) => bytesAfterPut = newBytes
-                case Left(newIterator) => valuesAfterPut = newIterator
-              }
-            } else {
-              // Save directly to disk.
-              // Don't get back the bytes unless we replicate them.
-              val askForBytes = level.replication > 1
-              val res = diskStore.putValues(blockId, values, level, 
askForBytes)
-              size = res.size
-              res.data match {
-                case Right(newBytes) => bytesAfterPut = newBytes
-                case _ =>
-              }
+        if (level.useMemory) {
+          // Save it just to memory first, even if it also has useDisk set to 
true; we will
+          // drop it to disk later if the memory store can't hold it.
+          val res = data match {
+            case IteratorValues(iterator) =>
+              memoryStore.putValues(blockId, iterator, level, true)
+            case ArrayBufferValues(array) =>
+              memoryStore.putValues(blockId, array, level, true)
+            case ByteBufferValues(bytes) => {
+              bytes.rewind();
+              memoryStore.putBytes(blockId, bytes, level)
+            }
+          }
+          size = res.size
+          res.data match {
+            case Right(newBytes) => bytesAfterPut = newBytes
+            case Left(newIterator) => valuesAfterPut = newIterator
+          }
+        } else {
+          // Save directly to disk.
+          // Don't get back the bytes unless we replicate them.
+          val askForBytes = level.replication > 1
+
+          val res = data match {
+            case IteratorValues(iterator) =>
+              diskStore.putValues(blockId, iterator, level, askForBytes)
+            case ArrayBufferValues(array) =>
+              diskStore.putValues(blockId, array, level, askForBytes)
+            case ByteBufferValues(bytes) => {
+              bytes.rewind();
+              diskStore.putBytes(blockId, bytes, level)
             }
           }
-          case Right(bytes) => {
-            bytes.rewind()
-            // Store it only in memory at first, even if useDisk is also set 
to true
-            (if (level.useMemory) memoryStore else 
diskStore).putBytes(blockId, bytes, level)
-            size = bytes.limit
+          size = res.size
+          res.data match {
+            case Right(newBytes) => bytesAfterPut = newBytes
+            case _ =>
           }
         }
 
@@ -605,8 +620,8 @@ private[spark] class BlockManager(
     // values and need to serialize and replicate them now:
     if (level.replication > 1) {
       data match {
-        case Right(bytes) => Await.ready(replicationFuture, Duration.Inf)
-        case Left(values) => {
+        case ByteBufferValues(bytes) => Await.ready(replicationFuture, 
Duration.Inf)
+        case _ => {
           val remoteStartTime = System.currentTimeMillis
           // Serialize the block if not already done
           if (bytesAfterPut == null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index b047644..9a9be04 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -28,7 +28,7 @@ import org.apache.spark.Logging
  */
 private[spark]
 abstract class BlockStore(val blockManager: BlockManager) extends Logging {
-  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel)
+  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : 
PutResult
 
   /**
    * Put in a block and, possibly, also return its content as either bytes or 
another Iterator.
@@ -37,6 +37,9 @@ abstract class BlockStore(val blockManager: BlockManager) 
extends Logging {
    * @return a PutResult that contains the size of the data, as well as the 
values put if
    *         returnValues is true (if not, the result's data field can be null)
    */
+  def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
+    returnValues: Boolean) : PutResult
+
   def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: 
StorageLevel,
     returnValues: Boolean) : PutResult
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index d1f07dd..36ee4bc 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -37,7 +37,7 @@ private class DiskStore(blockManager: BlockManager, 
diskManager: DiskBlockManage
     diskManager.getBlockLocation(blockId).length
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel) {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel) : PutResult = {
     // So that we do not modify the input offsets !
     // duplicate does not copy buffer, so inexpensive
     val bytes = _bytes.duplicate()
@@ -52,6 +52,7 @@ private class DiskStore(blockManager: BlockManager, 
diskManager: DiskBlockManage
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
       file.getName, Utils.bytesToString(bytes.limit), (finishTime - 
startTime)))
+    return PutResult(bytes.limit(), Right(bytes.duplicate()))
   }
 
   override def putValues(
@@ -59,13 +60,22 @@ private class DiskStore(blockManager: BlockManager, 
diskManager: DiskBlockManage
       values: ArrayBuffer[Any],
       level: StorageLevel,
       returnValues: Boolean)
+  : PutResult = {
+    return putValues(blockId, values.toIterator, level, returnValues)
+  }
+
+  override def putValues(
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel,
+      returnValues: Boolean)
     : PutResult = {
 
     logDebug("Attempting to write values for block " + blockId)
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
     val outputStream = new FileOutputStream(file)
-    blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
+    blockManager.dataSerializeStream(blockId, outputStream, values)
     val length = file.length
 
     val timeTaken = System.currentTimeMillis - startTime

http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 1814175..b89212e 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
     }
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel) {
+  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel) : PutResult = {
     // Work on a duplicate - since the original input might be used elsewhere.
     val bytes = _bytes.duplicate()
     bytes.rewind()
@@ -59,8 +59,10 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
       elements ++= values
       val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
       tryToPut(blockId, elements, sizeEstimate, true)
+      PutResult(sizeEstimate, Left(values.toIterator))
     } else {
       tryToPut(blockId, bytes, bytes.limit, false)
+      PutResult(bytes.limit(), Right(bytes.duplicate()))
     }
   }
 
@@ -69,14 +71,33 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
       values: ArrayBuffer[Any],
       level: StorageLevel,
       returnValues: Boolean)
-    : PutResult = {
-
+  : PutResult = {
     if (level.deserialized) {
       val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
       tryToPut(blockId, values, sizeEstimate, true)
-      PutResult(sizeEstimate, Left(values.iterator))
+      PutResult(sizeEstimate, Left(values.toIterator))
+    } else {
+      val bytes = blockManager.dataSerialize(blockId, values.toIterator)
+      tryToPut(blockId, bytes, bytes.limit, false)
+      PutResult(bytes.limit(), Right(bytes.duplicate()))
+    }
+  }
+
+  override def putValues(
+      blockId: BlockId,
+      values: Iterator[Any],
+      level: StorageLevel,
+      returnValues: Boolean)
+    : PutResult = {
+
+    if (level.deserialized) {
+      val valueEntries = new ArrayBuffer[Any]()
+      valueEntries ++= values
+      val sizeEstimate = 
SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef])
+      tryToPut(blockId, valueEntries, sizeEstimate, true)
+      PutResult(sizeEstimate, Left(valueEntries.toIterator))
     } else {
-      val bytes = blockManager.dataSerialize(blockId, values.iterator)
+      val bytes = blockManager.dataSerialize(blockId, values)
       tryToPut(blockId, bytes, bytes.limit, false)
       PutResult(bytes.limit(), Right(bytes.duplicate()))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
new file mode 100644
index 0000000..b843b4c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.{SharedSparkContext, SparkConf, LocalSparkContext, 
SparkContext}
+
+
+class FlatmapIteratorSuite extends FunSuite with LocalSparkContext {
+  /* Tests the ability of Spark to deal with user provided iterators from 
flatMap
+   * calls, that may generate more data then available memory. In any
+   * memory based persistance Spark will unroll the iterator into an 
ArrayBuffer
+   * for caching, however in the case that the use defines DISK_ONLY 
persistance,
+   * the iterator will be fed directly to the serializer and written to disk.
+   *
+   * This also tests the ObjectOutputStream reset rate. When serializing using 
the
+   * Java serialization system, the serializer caches objects to prevent 
writing redundant
+   * data, however that stops GC of those objects. By calling 'reset' you 
flush that
+   * info from the serializer, and allow old objects to be GC'd
+   */
+  test("Flatmap Iterator to Disk") {
+    val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
+      .setAppName("iterator_to_disk_test")
+    sc = new SparkContext(sconf)
+    val expand_size = 100
+    val data = sc.parallelize((1 to 5).toSeq).
+      flatMap( x => Stream.range(0, expand_size))
+    var persisted = data.persist(StorageLevel.DISK_ONLY)
+    println(persisted.count())
+    assert(persisted.count()===500)
+    assert(persisted.filter(_==1).count()===5)
+  }
+
+  test("Flatmap Iterator to Memory") {
+    val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
+      .setAppName("iterator_to_disk_test")
+    sc = new SparkContext(sconf)
+    val expand_size = 100
+    val data = sc.parallelize((1 to 5).toSeq).
+      flatMap(x => Stream.range(0, expand_size))
+    var persisted = data.persist(StorageLevel.MEMORY_ONLY)
+    println(persisted.count())
+    assert(persisted.count()===500)
+    assert(persisted.filter(_==1).count()===5)
+  }
+
+  test("Serializer Reset") {
+    val sconf = new SparkConf().setMaster("local-cluster[1,1,512]")
+      .setAppName("serializer_reset_test")
+      .set("spark.serializer.objectStreamReset", "10")
+    sc = new SparkContext(sconf)
+    val expand_size = 500
+    val data = sc.parallelize(Seq(1,2)).
+      flatMap(x => Stream.range(1, expand_size).
+      map(y => "%d: string test %d".format(y,x)))
+    var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER)
+    assert(persisted.filter(_.startsWith("1:")).count()===2)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40566e10/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index dc5553f..017d509 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -245,6 +245,17 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td>spark.serializer.objectStreamReset</td>
+  <td>10000</td>
+  <td>
+    When serializing using org.apache.spark.serializer.JavaSerializer, the 
serializer caches 
+    objects to prevent writing redundant data, however that stops garbage 
collection of those 
+    objects. By calling 'reset' you flush that info from the serializer, and 
allow old 
+    objects to be collected. To turn off this periodic reset set it to a value 
of <= 0. 
+    By default it will reset the serializer every 10,000 objects.
+  </td>
+</tr>
+<tr>
   <td>spark.broadcast.factory</td>
   <td>org.apache.spark.broadcast.<br />HttpBroadcastFactory</td>
   <td>

Reply via email to