Repository: spark Updated Branches: refs/heads/branch-1.1 eb62094e8 -> 5e191fac0
[SPARK-3426] Fix sort-based shuffle error when spark.shuffle.compress and spark.shuffle.spill.compress settings are different This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the `spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have different values. The problem is that sort-based shuffle's read and write paths use different settings for determining whether to apply compression. ExternalSorter writes runs to files using `TempBlockId` ids, which causes `spark.shuffle.spill.compress` to be used for enabling compression, but these spilled files end up being shuffled over the network and read as shuffle files using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes `spark.shuffle.compress` to be used for enabling decompression. As a result, this leads to errors when these settings disagree. Based on the discussions in #2247 and #2178, it sounds like we don't want to remove the `spark.shuffle.spill.compress` setting. Therefore, I've tried to come up with a fix where `spark.shuffle.spill.compress` is used to compress data that's read and written locally and `spark.shuffle.compress` is used to compress any data that will be fetched / read as shuffle blocks. To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and `TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and `spark.shuffle.compress`, respectively. ExternalAppendOnlyMap also used temp blocks for spilling data. It looks like ExternalSorter was designed to be a generic sorter but its configuration already happens to be tied to sort-based shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress its spills; we can move the compression configuration to the constructor in a later commit if we find that ExternalSorter is being used in other contexts where we want different configuration options to control compression. To summarize: **Before:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress | **After:** | | ExternalAppendOnlyMap | ExternalSorter | |-------|------------------------------|------------------------| | Read | spark.shuffle.spill.compress | spark.shuffle.compress | | Write | spark.shuffle.spill.compress | spark.shuffle.compress | Thanks to andrewor14 for debugging this with me! Author: Josh Rosen <[email protected]> Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits: 1921cf6 [Josh Rosen] Minor edit for clarity. c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock(). 2c687b9 [Josh Rosen] Fix SPARK-3426. 91e7e40 [Josh Rosen] Combine tests into single test of all combinations 76ca65e [Josh Rosen] Add regression test for SPARK-3426. Conflicts: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e191fac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e191fac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e191fac Branch: refs/heads/branch-1.1 Commit: 5e191fac0e27f69cbf92970e9988206289564ac4 Parents: eb62094 Author: Josh Rosen <[email protected]> Authored: Wed Oct 22 14:49:58 2014 -0700 Committer: Josh Rosen <[email protected]> Committed: Wed Oct 22 15:10:14 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/storage/BlockId.scala | 11 ++++++--- .../org/apache/spark/storage/BlockManager.scala | 3 ++- .../apache/spark/storage/DiskBlockManager.scala | 17 ++++++++++---- .../util/collection/ExternalAppendOnlyMap.scala | 2 +- .../spark/util/collection/ExternalSorter.scala | 15 ++++++++++-- .../scala/org/apache/spark/ShuffleSuite.scala | 24 ++++++++++++++++++++ 6 files changed, 61 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5e191fac/core/src/main/scala/org/apache/spark/storage/BlockId.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c1756ac..42cec87 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -78,9 +78,14 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { def name = "input-" + streamId + "-" + uniqueId } -/** Id associated with temporary data managed as blocks. Not serializable. */ -private[spark] case class TempBlockId(id: UUID) extends BlockId { - def name = "temp_" + id +/** Id associated with temporary local data managed as blocks. Not serializable. */ +private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { + def name = "temp_local_" + id +} + +/** Id associated with temporary shuffle data managed as blocks. Not serializable. */ +private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId { + def name = "temp_shuffle_" + id } // Intended only for testing purposes http://git-wip-us.apache.org/repos/asf/spark/blob/5e191fac/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 12a92d4..b9501c3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1017,7 +1017,8 @@ private[spark] class BlockManager( case _: ShuffleBlockId => compressShuffle case _: BroadcastBlockId => compressBroadcast case _: RDDBlockId => compressRdds - case _: TempBlockId => compressShuffleSpill + case _: TempLocalBlockId => compressShuffleSpill + case _: TempShuffleBlockId => compressShuffle case _ => false } } http://git-wip-us.apache.org/repos/asf/spark/blob/5e191fac/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ec022ce..cbecd06 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -122,11 +122,20 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, getAllFiles().map(f => BlockId(f.getName)) } - /** Produces a unique block id and File suitable for intermediate results. */ - def createTempBlock(): (TempBlockId, File) = { - var blockId = new TempBlockId(UUID.randomUUID()) + /** Produces a unique block id and File suitable for storing local intermediate results. */ + def createTempLocalBlock(): (TempLocalBlockId, File) = { + var blockId = new TempLocalBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { - blockId = new TempBlockId(UUID.randomUUID()) + blockId = new TempLocalBlockId(UUID.randomUUID()) + } + (blockId, getFile(blockId)) + } + + /** Produces a unique block id and File suitable for storing shuffled intermediate results. */ + def createTempShuffleBlock(): (TempShuffleBlockId, File) = { + var blockId = new TempShuffleBlockId(UUID.randomUUID()) + while (getFile(blockId).exists()) { + blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) } http://git-wip-us.apache.org/repos/asf/spark/blob/5e191fac/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a015c1..3e09c25 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -176,7 +176,7 @@ class ExternalAppendOnlyMap[K, V, C]( val threadId = Thread.currentThread().getId logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)" .format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) - val (blockId, file) = diskBlockManager.createTempBlock() + val (blockId, file) = diskBlockManager.createTempLocalBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize, curWriteMetrics) http://git-wip-us.apache.org/repos/asf/spark/blob/5e191fac/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 43bbc68..3136306 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -38,6 +38,11 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId} * * If combining is disabled, the type C must equal V -- we'll cast the objects at the end. * + * Note: Although ExternalSorter is a fairly generic sorter, some of its configuration is tied + * to its use in sort-based shuffle (for example, its block compression is controlled by + * `spark.shuffle.compress`). We may need to revisit this if ExternalSorter is used in other + * non-shuffle contexts where we might want to use different configuration settings. + * * @param aggregator optional Aggregator with combine functions to use for merging data * @param partitioner optional Partitioner; if given, sort by partition ID and then key * @param ordering optional Ordering to sort keys within each partition; should be a total ordering @@ -297,7 +302,10 @@ private[spark] class ExternalSorter[K, V, C]( private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { assert(!bypassMergeSort) - val (blockId, file) = diskBlockManager.createTempBlock() + // Because these files may be read during shuffle, their compression must be controlled by + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. + val (blockId, file) = diskBlockManager.createTempShuffleBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) var objectsWritten = 0 // Objects written since the last flush @@ -376,7 +384,10 @@ private[spark] class ExternalSorter[K, V, C]( if (partitionWriters == null) { curWriteMetrics = new ShuffleWriteMetrics() partitionWriters = Array.fill(numPartitions) { - val (blockId, file) = diskBlockManager.createTempBlock() + // Because these files may be read during shuffle, their compression must be controlled by + // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use + // createTempShuffleBlock here; see SPARK-3426 for more context. + val (blockId, file) = diskBlockManager.createTempShuffleBlock() blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() } } http://git-wip-us.apache.org/repos/asf/spark/blob/5e191fac/core/src/test/scala/org/apache/spark/ShuffleSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index b13ddf9..704b382 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -242,6 +242,30 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { assert(thrown.getClass === classOf[SparkException]) assert(thrown.getMessage.toLowerCase.contains("serializable")) } + + test("shuffle with different compression settings (SPARK-3426)") { + for ( + shuffleSpillCompress <- Set(true, false); + shuffleCompress <- Set(true, false) + ) { + val conf = new SparkConf() + .setAppName("test") + .setMaster("local") + .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString) + .set("spark.shuffle.compress", shuffleCompress.toString) + .set("spark.shuffle.memoryFraction", "0.001") + resetSparkContext() + sc = new SparkContext(conf) + try { + sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect() + } catch { + case e: Exception => + val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," + + s" spark.shuffle.compress=$shuffleCompress" + throw new Exception(errMsg, e) + } + } + } } object ShuffleSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
