Repository: spark
Updated Branches:
  refs/heads/master 97cf19f64 -> 813effc70


[SPARK-3426] Fix sort-based shuffle error when spark.shuffle.compress and 
spark.shuffle.spill.compress settings are different

This PR fixes SPARK-3426, an issue where sort-based shuffle crashes if the
`spark.shuffle.spill.compress` and `spark.shuffle.compress` settings have
different values.

The problem is that sort-based shuffle's read and write paths use different
settings for determining whether to apply compression.  ExternalSorter writes
runs to files using `TempBlockId` ids, which causes
`spark.shuffle.spill.compress` to be used for enabling compression, but these
spilled files end up being shuffled over the network and read as shuffle files
using `ShuffleBlockId` by BlockStoreShuffleFetcher, which causes
`spark.shuffle.compress` to be used for enabling decompression.  As a result,
this leads to errors when these settings disagree.

Based on the discussions in #2247 and #2178, it sounds like we don't want to
remove the `spark.shuffle.spill.compress` setting.  Therefore, I've tried to
come up with a fix where `spark.shuffle.spill.compress` is used to compress
data that's read and written locally and `spark.shuffle.compress` is used to
compress any data that will be fetched / read as shuffle blocks.

To do this, I split `TempBlockId` into two new id types, `TempLocalBlockId` and
`TempShuffleBlockId`, which map to `spark.shuffle.spill.compress` and
`spark.shuffle.compress`, respectively.  ExternalAppendOnlyMap also used temp
blocks for spilling data.  It looks like ExternalSorter was designed to be
a generic sorter but its configuration already happens to be tied to sort-based
shuffle, so I think it's fine if we use `spark.shuffle.compress` to compress
its spills; we can move the compression configuration to the constructor in
a later commit if we find that ExternalSorter is being used in other contexts
where we want different configuration options to control compression.  To
summarize:

**Before:**

|       | ExternalAppendOnlyMap        | ExternalSorter               |
|-------|------------------------------|------------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress       |
| Write | spark.shuffle.spill.compress | spark.shuffle.spill.compress |

**After:**

|       | ExternalAppendOnlyMap        | ExternalSorter         |
|-------|------------------------------|------------------------|
| Read  | spark.shuffle.spill.compress | spark.shuffle.compress |
| Write | spark.shuffle.spill.compress | spark.shuffle.compress |

Thanks to andrewor14 for debugging this with me!

Author: Josh Rosen <joshro...@databricks.com>

Closes #2890 from JoshRosen/SPARK-3426 and squashes the following commits:

1921cf6 [Josh Rosen] Minor edit for clarity.
c8dd8f2 [Josh Rosen] Add comment explaining use of createTempShuffleBlock().
2c687b9 [Josh Rosen] Fix SPARK-3426.
91e7e40 [Josh Rosen] Combine tests into single test of all combinations
76ca65e [Josh Rosen] Add regression test for SPARK-3426.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/813effc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/813effc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/813effc7

Branch: refs/heads/master
Commit: 813effc701fc27121c6f23ab32882932016fdbe0
Parents: 97cf19f
Author: Josh Rosen <joshro...@databricks.com>
Authored: Wed Oct 22 14:49:58 2014 -0700
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Wed Oct 22 14:49:58 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockId.scala      | 11 ++++++---
 .../org/apache/spark/storage/BlockManager.scala |  3 ++-
 .../apache/spark/storage/DiskBlockManager.scala | 17 ++++++++++----
 .../util/collection/ExternalAppendOnlyMap.scala |  2 +-
 .../spark/util/collection/ExternalSorter.scala  | 15 ++++++++++--
 .../scala/org/apache/spark/ShuffleSuite.scala   | 24 ++++++++++++++++++++
 6 files changed, 61 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/813effc7/core/src/main/scala/org/apache/spark/storage/BlockId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index a83a3f4..8df5ec6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -83,9 +83,14 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) 
extends BlockId {
   def name = "input-" + streamId + "-" + uniqueId
 }
 
-/** Id associated with temporary data managed as blocks. Not serializable. */
-private[spark] case class TempBlockId(id: UUID) extends BlockId {
-  def name = "temp_" + id
+/** Id associated with temporary local data managed as blocks. Not 
serializable. */
+private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
+  def name = "temp_local_" + id
+}
+
+/** Id associated with temporary shuffle data managed as blocks. Not 
serializable. */
+private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
+  def name = "temp_shuffle_" + id
 }
 
 // Intended only for testing purposes

http://git-wip-us.apache.org/repos/asf/spark/blob/813effc7/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0ce2a3f..4cc9792 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1071,7 +1071,8 @@ private[spark] class BlockManager(
       case _: ShuffleBlockId => compressShuffle
       case _: BroadcastBlockId => compressBroadcast
       case _: RDDBlockId => compressRdds
-      case _: TempBlockId => compressShuffleSpill
+      case _: TempLocalBlockId => compressShuffleSpill
+      case _: TempShuffleBlockId => compressShuffle
       case _ => false
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/813effc7/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index a715594..6633a1d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -98,11 +98,20 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
     getAllFiles().map(f => BlockId(f.getName))
   }
 
-  /** Produces a unique block id and File suitable for intermediate results. */
-  def createTempBlock(): (TempBlockId, File) = {
-    var blockId = new TempBlockId(UUID.randomUUID())
+  /** Produces a unique block id and File suitable for storing local 
intermediate results. */
+  def createTempLocalBlock(): (TempLocalBlockId, File) = {
+    var blockId = new TempLocalBlockId(UUID.randomUUID())
     while (getFile(blockId).exists()) {
-      blockId = new TempBlockId(UUID.randomUUID())
+      blockId = new TempLocalBlockId(UUID.randomUUID())
+    }
+    (blockId, getFile(blockId))
+  }
+
+  /** Produces a unique block id and File suitable for storing shuffled 
intermediate results. */
+  def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
+    var blockId = new TempShuffleBlockId(UUID.randomUUID())
+    while (getFile(blockId).exists()) {
+      blockId = new TempShuffleBlockId(UUID.randomUUID())
     }
     (blockId, getFile(blockId))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/813effc7/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 0c088da..26fa0cb 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -153,7 +153,7 @@ class ExternalAppendOnlyMap[K, V, C](
    * Sort the existing contents of the in-memory map and spill them to a 
temporary file on disk.
    */
   override protected[this] def spill(collection: SizeTracker): Unit = {
-    val (blockId, file) = diskBlockManager.createTempBlock()
+    val (blockId, file) = diskBlockManager.createTempLocalBlock()
     curWriteMetrics = new ShuffleWriteMetrics()
     var writer = blockManager.getDiskWriter(blockId, file, serializer, 
fileBufferSize,
       curWriteMetrics)

http://git-wip-us.apache.org/repos/asf/spark/blob/813effc7/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index d1b06d1..c1ce136 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -38,6 +38,11 @@ import org.apache.spark.storage.{BlockObjectWriter, BlockId}
  *
  * If combining is disabled, the type C must equal V -- we'll cast the objects 
at the end.
  *
+ * Note: Although ExternalSorter is a fairly generic sorter, some of its 
configuration is tied
+ * to its use in sort-based shuffle (for example, its block compression is 
controlled by
+ * `spark.shuffle.compress`).  We may need to revisit this if ExternalSorter 
is used in other
+ * non-shuffle contexts where we might want to use different configuration 
settings.
+ *
  * @param aggregator optional Aggregator with combine functions to use for 
merging data
  * @param partitioner optional Partitioner; if given, sort by partition ID and 
then key
  * @param ordering optional Ordering to sort keys within each partition; 
should be a total ordering
@@ -259,7 +264,10 @@ private[spark] class ExternalSorter[K, V, C](
   private def spillToMergeableFile(collection: 
SizeTrackingPairCollection[(Int, K), C]): Unit = {
     assert(!bypassMergeSort)
 
-    val (blockId, file) = diskBlockManager.createTempBlock()
+    // Because these files may be read during shuffle, their compression must 
be controlled by
+    // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we 
need to use
+    // createTempShuffleBlock here; see SPARK-3426 for more context.
+    val (blockId, file) = diskBlockManager.createTempShuffleBlock()
     curWriteMetrics = new ShuffleWriteMetrics()
     var writer = blockManager.getDiskWriter(blockId, file, ser, 
fileBufferSize, curWriteMetrics)
     var objectsWritten = 0   // Objects written since the last flush
@@ -338,7 +346,10 @@ private[spark] class ExternalSorter[K, V, C](
     if (partitionWriters == null) {
       curWriteMetrics = new ShuffleWriteMetrics()
       partitionWriters = Array.fill(numPartitions) {
-        val (blockId, file) = diskBlockManager.createTempBlock()
+        // Because these files may be read during shuffle, their compression 
must be controlled by
+        // spark.shuffle.compress instead of spark.shuffle.spill.compress, so 
we need to use
+        // createTempShuffleBlock here; see SPARK-3426 for more context.
+        val (blockId, file) = diskBlockManager.createTempShuffleBlock()
         blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, 
curWriteMetrics).open()
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/813effc7/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 15aa4d8..2bdd84c 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -242,6 +242,30 @@ abstract class ShuffleSuite extends FunSuite with Matchers 
with LocalSparkContex
     assert(thrown.getClass === classOf[SparkException])
     assert(thrown.getMessage.toLowerCase.contains("serializable"))
   }
+
+  test("shuffle with different compression settings (SPARK-3426)") {
+    for (
+      shuffleSpillCompress <- Set(true, false);
+      shuffleCompress <- Set(true, false)
+    ) {
+      val conf = new SparkConf()
+        .setAppName("test")
+        .setMaster("local")
+        .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString)
+        .set("spark.shuffle.compress", shuffleCompress.toString)
+        .set("spark.shuffle.memoryFraction", "0.001")
+      resetSparkContext()
+      sc = new SparkContext(conf)
+      try {
+        sc.parallelize(0 until 100000).map(i => (i / 4, 
i)).groupByKey().collect()
+      } catch {
+        case e: Exception =>
+          val errMsg = s"Failed with 
spark.shuffle.spill.compress=$shuffleSpillCompress," +
+            s" spark.shuffle.compress=$shuffleCompress"
+          throw new Exception(errMsg, e)
+      }
+    }
+  }
 }
 
 object ShuffleSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to