Updated Branches:
  refs/heads/branch-0.9 e5f8917fd -> b6fd3cd33

Merge pull request #480 from pwendell/0.9-fixes

Handful of 0.9 fixes

This patch addresses a few fixes for Spark 0.9.0 based on the last release 
candidate.

@mridulm gets credit for reporting most of the issues here. Many of the fixes 
here are based on his work in #477 and follow up discussion with him.
(cherry picked from commit 77b986f6616e6f7e0be9e46bb355829686f9845b)

Signed-off-by: Patrick Wendell <pwend...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b6fd3cd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b6fd3cd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b6fd3cd3

Branch: refs/heads/branch-0.9
Commit: b6fd3cd33d667e6fda517c7c491462b68c48145c
Parents: e5f8917
Author: Patrick Wendell <pwend...@gmail.com>
Authored: Tue Jan 21 00:09:42 2014 -0800
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Tue Jan 21 00:12:01 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala | 10 ++++-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |  4 +-
 .../apache/spark/scheduler/TaskSetManager.scala |  2 +-
 .../spark/storage/BlockObjectWriter.scala       |  5 ++-
 .../spark/storage/ShuffleBlockManager.scala     | 16 ++++++--
 .../util/collection/ExternalAppendOnlyMap.scala | 41 ++++++++++++++++----
 docs/configuration.md                           | 11 ++----
 7 files changed, 65 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 951bfd7..45d19bc 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
   }
 
   /** Get all akka conf variables set on this SparkConf */
-  def getAkkaConf: Seq[(String, String)] =  getAll.filter {case (k, v) => 
k.startsWith("akka.")}
+  def getAkkaConf: Seq[(String, String)] =
+    /* This is currently undocumented. If we want to make this public we 
should consider
+     * nesting options under the spark namespace to avoid conflicts with user 
akka options.
+     * Otherwise users configuring their own akka code via system properties 
could mess up
+     * spark's akka options.
+     *
+     *   E.g. spark.akka.option.x.y.x = "value"
+     */
+    getAll.filter {case (k, v) => k.startsWith("akka.")}
 
   /** Does the configuration contain a given parameter? */
   def contains(key: String): Boolean = settings.contains(key)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 83109d1..30e578d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val 
checkpointPath: String)
     val numPartitions =
     // listStatus can throw exception if path does not exist.
     if (fs.exists(cpath)) {
-      val dirContents = fs.listStatus(cpath)
-      val partitionFiles = 
dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+      val dirContents = fs.listStatus(cpath).map(_.getPath)
+      val partitionFiles = 
dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
       val numPart =  partitionFiles.size
       if (numPart > 0 && (! 
partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
           ! 
partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index fc0ee07..73d6972 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(
 
   /** Check whether a task is currently running an attempt on a given host */
   private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
-    !taskAttempts(taskIndex).exists(_.host == host)
+    taskAttempts(taskIndex).exists(_.host == host)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 48cec4b..530712b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -138,6 +138,7 @@ private[spark] class DiskBlockObjectWriter(
       fos = null
       ts = null
       objOut = null
+      initialized = false
     }
   }
 
@@ -145,7 +146,8 @@ private[spark] class DiskBlockObjectWriter(
 
   override def commit(): Long = {
     if (initialized) {
-      // NOTE: Flush the serializer first and then the compressed/buffered 
output stream
+      // NOTE: Because Kryo doesn't flush the underlying stream we explicitly 
flush both the
+      //       serializer stream and the lower level stream.
       objOut.flush()
       bs.flush()
       val prevPos = lastValidPosition
@@ -175,7 +177,6 @@ private[spark] class DiskBlockObjectWriter(
   }
 
   override def fileSegment(): FileSegment = {
-    val bytesWritten = lastValidPosition - initialPosition
     new FileSegment(file, initialPosition, bytesWritten)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b2429..bb07c8c 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConversions._
 
+import org.apache.spark.Logging
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, 
TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, 
PrimitiveVector}
 import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, 
TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, 
PrimitiveVector}
 
 /** A group of writers for a ShuffleMapTask, one writer per reducer. */
 private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
  * files within a ShuffleFileGroups associated with the block's reducer.
  */
 private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
   def conf = blockManager.conf
 
   // Turning off shuffle file consolidation causes all shuffle Blocks to get 
their own file.
@@ -106,6 +107,15 @@ class ShuffleBlockManager(blockManager: BlockManager) {
         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
           val blockFile = blockManager.diskBlockManager.getFile(blockId)
+          // Because of previous failures, the shuffle file may already exist 
on this machine.
+          // If so, remove it.
+          if (blockFile.exists) {
+            if (blockFile.delete()) {
+              logInfo(s"Removed existing shuffle file $blockFile")
+            } else {
+              logWarning(s"Failed to remove existing shuffle file $blockFile")
+            }
+          }
           blockManager.getDiskWriter(blockId, blockFile, serializer, 
bufferSize)
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b43..fb73636 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
 import java.io._
 import java.util.Comparator
 
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
 import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.{KryoDeserializationStream, 
KryoSerializationStream, Serializer}
-import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, 
DiskBlockObjectWriter}
+import org.apache.spark.io.LZFCompressionCodec
+import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
 
 /**
  * An append-only map that spills sorted content to disk when there is 
insufficient space for it
@@ -153,9 +154,33 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" 
else ""))
     val (blockId, file) = diskBlockManager.createTempBlock()
 
-    val compressStream: OutputStream => OutputStream = 
blockManager.wrapForCompression(blockId, _)
+    /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, 
this approach
+    *  closes and re-opens serialization and compression streams within each 
file. This makes some
+     * assumptions about the way that serialization and compression streams 
work, specifically:
+     *
+     * 1) The serializer input streams do not pre-fetch data from the 
underlying stream.
+     *
+     * 2) Several compression streams can be opened, written to, and flushed 
on the write path
+     *    while only one compression input stream is created on the read path
+     *
+     * In practice (1) is only true for Java, so we add a special fix below to 
make it work for
+     * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use 
LZF.
+     *
+     * To avoid making these assumptions we should create an intermediate 
stream that batches
+     * objects and sends an EOF to the higher layer streams to make sure they 
never prefetch data.
+     * This is a bit tricky because, within each segment, you'd need to track 
the total number
+     * of bytes written and then re-wind and write it at the beginning of the 
segment. This will
+     * most likely require using the file channel API.
+     */
+
+    val shouldCompress = blockManager.shouldCompress(blockId)
+    val compressionCodec = new LZFCompressionCodec(sparkConf)
+    def wrapForCompression(outputStream: OutputStream) = {
+      if (shouldCompress) 
compressionCodec.compressedOutputStream(outputStream) else outputStream
+    }
+
     def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, 
fileBufferSize,
-      compressStream, syncWrites)
+      wrapForCompression, syncWrites)
 
     var writer = getNewWriter
     var objectsWritten = 0
@@ -168,6 +193,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
 
         if (objectsWritten == serializerBatchSize) {
           writer.commit()
+          writer.close()
+          _diskBytesSpilled += writer.bytesWritten
           writer = getNewWriter
           objectsWritten = 0
         }
@@ -176,8 +203,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       if (objectsWritten > 0) writer.commit()
     } finally {
       // Partial failures cannot be tolerated; do not revert partial writes
-      _diskBytesSpilled += writer.bytesWritten
       writer.close()
+      _diskBytesSpilled += writer.bytesWritten
     }
     currentMap = new SizeTrackingAppendOnlyMap[K, C]
     spilledMaps.append(new DiskMapIterator(file, blockId))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0086490..be548e3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -158,7 +158,9 @@ Apart from these, the following properties are also 
available, and may be useful
   <td>spark.shuffle.spill.compress</td>
   <td>true</td>
   <td>
-    Whether to compress data spilled during shuffles.
+    Whether to compress data spilled during shuffles. If enabled, spill 
compression
+    always uses the `org.apache.spark.io.LZFCompressionCodec` codec, 
+    regardless of the value of `spark.io.compression.codec`.
   </td>
 </tr>
 <tr>
@@ -379,13 +381,6 @@ Apart from these, the following properties are also 
available, and may be useful
     Too large a value decreases parallelism during broadcast (makes it 
slower); however, if it is too small, <code>BlockManager</code> might take a 
performance hit.
   </td>
 </tr>
-<tr>
-  <td>akka.x.y....</td>
-  <td>value</td>
-  <td>
-    An arbitrary akka configuration can be set directly on spark conf and it 
is applied for all the ActorSystems created spark wide for that SparkContext 
and its assigned executors as well.
-  </td>
-</tr>
 
 <tr>
   <td>spark.shuffle.consolidateFiles</td>

Reply via email to