Updated Branches: refs/heads/branch-0.9 e5f8917fd -> b6fd3cd33
Merge pull request #480 from pwendell/0.9-fixes Handful of 0.9 fixes This patch addresses a few fixes for Spark 0.9.0 based on the last release candidate. @mridulm gets credit for reporting most of the issues here. Many of the fixes here are based on his work in #477 and follow up discussion with him. (cherry picked from commit 77b986f6616e6f7e0be9e46bb355829686f9845b) Signed-off-by: Patrick Wendell <pwend...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b6fd3cd3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b6fd3cd3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b6fd3cd3 Branch: refs/heads/branch-0.9 Commit: b6fd3cd33d667e6fda517c7c491462b68c48145c Parents: e5f8917 Author: Patrick Wendell <pwend...@gmail.com> Authored: Tue Jan 21 00:09:42 2014 -0800 Committer: Patrick Wendell <pwend...@gmail.com> Committed: Tue Jan 21 00:12:01 2014 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkConf.scala | 10 ++++- .../org/apache/spark/rdd/CheckpointRDD.scala | 4 +- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../spark/storage/BlockObjectWriter.scala | 5 ++- .../spark/storage/ShuffleBlockManager.scala | 16 ++++++-- .../util/collection/ExternalAppendOnlyMap.scala | 41 ++++++++++++++++---- docs/configuration.md | 11 ++---- 7 files changed, 65 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 951bfd7..45d19bc 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -192,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } /** Get all akka conf variables set on this SparkConf */ - def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")} + def getAkkaConf: Seq[(String, String)] = + /* This is currently undocumented. If we want to make this public we should consider + * nesting options under the spark namespace to avoid conflicts with user akka options. + * Otherwise users configuring their own akka code via system properties could mess up + * spark's akka options. + * + * E.g. spark.akka.option.x.y.x = "value" + */ + getAll.filter {case (k, v) => k.startsWith("akka.")} /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.contains(key) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 83109d1..30e578d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) val numPartitions = // listStatus can throw exception if path does not exist. if (fs.exists(cpath)) { - val dirContents = fs.listStatus(cpath) - val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted + val dirContents = fs.listStatus(cpath).map(_.getPath) + val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted val numPart = partitionFiles.size if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index fc0ee07..73d6972 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -233,7 +233,7 @@ private[spark] class TaskSetManager( /** Check whether a task is currently running an attempt on a given host */ private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = { - !taskAttempts(taskIndex).exists(_.host == host) + taskAttempts(taskIndex).exists(_.host == host) } /** http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 48cec4b..530712b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -138,6 +138,7 @@ private[spark] class DiskBlockObjectWriter( fos = null ts = null objOut = null + initialized = false } } @@ -145,7 +146,8 @@ private[spark] class DiskBlockObjectWriter( override def commit(): Long = { if (initialized) { - // NOTE: Flush the serializer first and then the compressed/buffered output stream + // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the + // serializer stream and the lower level stream. objOut.flush() bs.flush() val prevPos = lastValidPosition @@ -175,7 +177,6 @@ private[spark] class DiskBlockObjectWriter( } override def fileSegment(): FileSegment = { - val bytesWritten = lastValidPosition - initialPosition new FileSegment(file, initialPosition, bytesWritten) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index e2b2429..bb07c8c 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ +import org.apache.spark.Logging import org.apache.spark.serializer.Serializer -import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap} -import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} +import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} /** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { @@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup { * files within a ShuffleFileGroups associated with the block's reducer. */ private[spark] -class ShuffleBlockManager(blockManager: BlockManager) { +class ShuffleBlockManager(blockManager: BlockManager) extends Logging { def conf = blockManager.conf // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. @@ -106,6 +107,15 @@ class ShuffleBlockManager(blockManager: BlockManager) { Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => val blockId = ShuffleBlockId(shuffleId, mapId, bucketId) val blockFile = blockManager.diskBlockManager.getFile(blockId) + // Because of previous failures, the shuffle file may already exist on this machine. + // If so, remove it. + if (blockFile.exists) { + if (blockFile.delete()) { + logInfo(s"Removed existing shuffle file $blockFile") + } else { + logWarning(s"Failed to remove existing shuffle file $blockFile") + } + } blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 64e9b43..fb73636 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -20,14 +20,15 @@ package org.apache.spark.util.collection import java.io._ import java.util.Comparator -import it.unimi.dsi.fastutil.io.FastBufferedInputStream - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import it.unimi.dsi.fastutil.io.FastBufferedInputStream + import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer} -import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter} +import org.apache.spark.io.LZFCompressionCodec +import org.apache.spark.serializer.{KryoDeserializationStream, Serializer} +import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter} /** * An append-only map that spills sorted content to disk when there is insufficient space for it @@ -153,9 +154,33 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) val (blockId, file) = diskBlockManager.createTempBlock() - val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _) + /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach + * closes and re-opens serialization and compression streams within each file. This makes some + * assumptions about the way that serialization and compression streams work, specifically: + * + * 1) The serializer input streams do not pre-fetch data from the underlying stream. + * + * 2) Several compression streams can be opened, written to, and flushed on the write path + * while only one compression input stream is created on the read path + * + * In practice (1) is only true for Java, so we add a special fix below to make it work for + * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF. + * + * To avoid making these assumptions we should create an intermediate stream that batches + * objects and sends an EOF to the higher layer streams to make sure they never prefetch data. + * This is a bit tricky because, within each segment, you'd need to track the total number + * of bytes written and then re-wind and write it at the beginning of the segment. This will + * most likely require using the file channel API. + */ + + val shouldCompress = blockManager.shouldCompress(blockId) + val compressionCodec = new LZFCompressionCodec(sparkConf) + def wrapForCompression(outputStream: OutputStream) = { + if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream + } + def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize, - compressStream, syncWrites) + wrapForCompression, syncWrites) var writer = getNewWriter var objectsWritten = 0 @@ -168,6 +193,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( if (objectsWritten == serializerBatchSize) { writer.commit() + writer.close() + _diskBytesSpilled += writer.bytesWritten writer = getNewWriter objectsWritten = 0 } @@ -176,8 +203,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( if (objectsWritten > 0) writer.commit() } finally { // Partial failures cannot be tolerated; do not revert partial writes - _diskBytesSpilled += writer.bytesWritten writer.close() + _diskBytesSpilled += writer.bytesWritten } currentMap = new SizeTrackingAppendOnlyMap[K, C] spilledMaps.append(new DiskMapIterator(file, blockId)) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b6fd3cd3/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 0086490..be548e3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful <td>spark.shuffle.spill.compress</td> <td>true</td> <td> - Whether to compress data spilled during shuffles. + Whether to compress data spilled during shuffles. If enabled, spill compression + always uses the `org.apache.spark.io.LZFCompressionCodec` codec, + regardless of the value of `spark.io.compression.codec`. </td> </tr> <tr> @@ -379,13 +381,6 @@ Apart from these, the following properties are also available, and may be useful Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit. </td> </tr> -<tr> - <td>akka.x.y....</td> - <td>value</td> - <td> - An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well. - </td> -</tr> <tr> <td>spark.shuffle.consolidateFiles</td>