Repository: spark
Updated Branches:
  refs/heads/master 49da38e5f -> e04811137


[SPARK-10466] [SQL] UnsafeRow SerDe exception with data spill

Data Spill with UnsafeRow causes assert failure.

```
java.lang.AssertionError: assertion failed
        at scala.Predef$.assert(Predef.scala:165)
        at 
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
        at 
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
        at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
        at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
        at 
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
```

To reproduce that with code (thanks andrewor14):
```scala
bin/spark-shell --master local
  --conf spark.shuffle.memoryFraction=0.005
  --conf spark.shuffle.sort.bypassMergeThreshold=0

sc.parallelize(1 to 2 * 1000 * 1000, 10)
  .map { i => (i, i) }.toDF("a", "b").groupBy("b").avg().count()
```

Author: Cheng Hao <hao.ch...@intel.com>

Closes #8635 from chenghao-intel/unsafe_spill.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0481113
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0481113
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0481113

Branch: refs/heads/master
Commit: e04811137680f937669cdcc78771227aeb7cd849
Parents: 49da38e
Author: Cheng Hao <hao.ch...@intel.com>
Authored: Thu Sep 10 11:48:43 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Sep 10 11:48:43 2015 -0700

----------------------------------------------------------------------
 .../spark/util/collection/ExternalSorter.scala  |  6 ++
 .../sql/execution/UnsafeRowSerializer.scala     |  2 +-
 .../execution/UnsafeRowSerializerSuite.scala    | 64 ++++++++++++++++++--
 3 files changed, 67 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0481113/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 19287ed..138c05d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -188,6 +188,12 @@ private[spark] class ExternalSorter[K, V, C](
 
   private val spills = new ArrayBuffer[SpilledFile]
 
+  /**
+   * Number of files this sorter has spilled so far.
+   * Exposed for testing.
+   */
+  private[spark] def numSpills: Int = spills.size
+
   override def insertAll(records: Iterator[Product2[K, V]]): Unit = {
     // TODO: stop combining if we find that the reduction factor isn't high
     val shouldCombine = aggregator.isDefined

http://git-wip-us.apache.org/repos/asf/spark/blob/e0481113/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 5c18558..e060c06 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -72,7 +72,7 @@ private class UnsafeRowSerializerInstance(numFields: Int) 
extends SerializerInst
     override def writeKey[T: ClassTag](key: T): SerializationStream = {
       // The key is only needed on the map side when computing partition ids. 
It does not need to
       // be shuffled.
-      assert(key.isInstanceOf[Int])
+      assert(null == key || key.isInstanceOf[Int])
       this
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0481113/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index bd02c73..0113d05 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -17,13 +17,17 @@
 
 package org.apache.spark.sql.execution
 
-import java.io.{DataOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.{File, DataOutputStream, ByteArrayInputStream, 
ByteArrayOutputStream}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.storage.ShuffleBlockId
+import org.apache.spark.util.collection.ExternalSorter
+import org.apache.spark.util.Utils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.types._
+import org.apache.spark._
 
 
 /**
@@ -40,9 +44,15 @@ class ClosableByteArrayInputStream(buf: Array[Byte]) extends 
ByteArrayInputStrea
 class UnsafeRowSerializerSuite extends SparkFunSuite {
 
   private def toUnsafeRow(row: Row, schema: Array[DataType]): UnsafeRow = {
-    val internalRow = 
CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow]
+    val converter = unsafeRowConverter(schema)
+    converter(row)
+  }
+
+  private def unsafeRowConverter(schema: Array[DataType]): Row => UnsafeRow = {
     val converter = UnsafeProjection.create(schema)
-    converter.apply(internalRow)
+    (row: Row) => {
+      
converter(CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[InternalRow])
+    }
   }
 
   test("toUnsafeRow() test helper method") {
@@ -87,4 +97,50 @@ class UnsafeRowSerializerSuite extends SparkFunSuite {
     assert(!deserializerIter.hasNext)
     assert(input.closed)
   }
+
+  test("SPARK-10466: external sorter spilling with unsafe row serializer") {
+    var sc: SparkContext = null
+    var outputFile: File = null
+    val oldEnv = SparkEnv.get // save the old SparkEnv, as it will be 
overwritten
+    Utils.tryWithSafeFinally {
+      val conf = new SparkConf()
+        .set("spark.shuffle.spill.initialMemoryThreshold", "1024")
+        .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+        .set("spark.shuffle.memoryFraction", "0.0001")
+
+      sc = new SparkContext("local", "test", conf)
+      outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")
+      // prepare data
+      val converter = unsafeRowConverter(Array(IntegerType))
+      val data = (1 to 1000).iterator.map { i =>
+        (i, converter(Row(i)))
+      }
+      val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
+        partitioner = Some(new HashPartitioner(10)),
+        serializer = Some(new UnsafeRowSerializer(numFields = 1)))
+
+      // Ensure we spilled something and have to merge them later
+      assert(sorter.numSpills === 0)
+      sorter.insertAll(data)
+      assert(sorter.numSpills > 0)
+
+      // Merging spilled files should not throw assertion error
+      val taskContext =
+        new TaskContextImpl(0, 0, 0, 0, null, null, 
InternalAccumulator.create(sc))
+      taskContext.taskMetrics.shuffleWriteMetrics = Some(new 
ShuffleWriteMetrics)
+      sorter.writePartitionedFile(ShuffleBlockId(0, 0, 0), taskContext, 
outputFile)
+    } {
+      // Clean up
+      if (sc != null) {
+        sc.stop()
+      }
+
+      // restore the spark env
+      SparkEnv.set(oldEnv)
+
+      if (outputFile != null) {
+        outputFile.delete()
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to