Repository: spark
Updated Branches:
  refs/heads/branch-0.9 6634a348a -> 51f677eb9


SPARK-2043: ExternalAppendOnlyMap doesn't always find matching keys

The current implementation reads one key with the next hash code as it finishes 
reading the keys with the current hash code, which may cause it to miss some 
matches of the next key. This can cause operations like join to give the wrong 
result when reduce tasks spill to disk and there are hash collisions, as values 
won't be matched together. This PR fixes it by not reading in that next key, 
using a peeking iterator instead.

Author: Matei Zaharia <[email protected]>

Closes #986 from mateiz/spark-2043 and squashes the following commits:

0959514 [Matei Zaharia] Added unit test for having many hash collisions
892debb [Matei Zaharia] SPARK-2043: don't read a key with the next hash code in 
ExternalAppendOnlyMap, instead use a buffered iterator to only read values with 
the current hash code.

(cherry picked from commit b45c13e7d798f97b92f1a6329528191b8d779c4f)
Signed-off-by: Matei Zaharia <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51f677eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51f677eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51f677eb

Branch: refs/heads/branch-0.9
Commit: 51f677eb9a4fb2cbef8c62b69b4af3ff02560ac5
Parents: 6634a34
Author: Matei Zaharia <[email protected]>
Authored: Thu Jun 5 23:01:48 2014 -0700
Committer: Matei Zaharia <[email protected]>
Committed: Thu Jun 5 23:02:11 2014 -0700

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 10 +++--
 .../collection/ExternalAppendOnlyMapSuite.scala | 39 +++++++++++++++++++-
 2 files changed, 44 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51f677eb/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 6f36817..b4e6824 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection
 import java.io._
 import java.util.Comparator
 
+import scala.collection.BufferedIterator
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -230,7 +231,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
     // Input streams are derived both from the in-memory map and spilled maps 
on disk
     // The in-memory map is sorted in place, while the spilled maps are 
already in sorted order
     private val sortedMap = currentMap.destructiveSortedIterator(comparator)
-    private val inputStreams = Seq(sortedMap) ++ spilledMaps
+    private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => 
it.buffered)
 
     inputStreams.foreach { it =>
       val kcPairs = getMorePairs(it)
@@ -245,13 +246,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
      * In the event of key hash collisions, this ensures no pairs are hidden 
from being merged.
      * Assume the given iterator is in sorted order.
      */
-    private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
+    private def getMorePairs(it: BufferedIterator[(K, C)]): ArrayBuffer[(K, 
C)] = {
       val kcPairs = new ArrayBuffer[(K, C)]
       if (it.hasNext) {
         var kc = it.next()
         kcPairs += kc
         val minHash = kc._1.hashCode()
-        while (it.hasNext && kc._1.hashCode() == minHash) {
+        while (it.hasNext && it.head._1.hashCode() == minHash) {
           kc = it.next()
           kcPairs += kc
         }
@@ -324,7 +325,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
      *
      * StreamBuffers are ordered by the minimum key hash found across all of 
their own pairs.
      */
-    private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: 
ArrayBuffer[(K, C)])
+    private class StreamBuffer(
+        val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)])
       extends Comparable[StreamBuffer] {
 
       def isEmpty = pairs.length == 0

http://git-wip-us.apache.org/repos/asf/spark/blob/51f677eb/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index fce1184..8675e0d 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -277,6 +277,11 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
       ("pomatoes", "eructation")      // 568647356
     )
 
+    collisionPairs.foreach { case (w1, w2) =>
+      // String.hashCode is documented to use a specific algorithm, but check 
just in case
+      assert(w1.hashCode === w2.hashCode)
+    }
+
     (1 to 100000).map(_.toString).foreach { i => map.insert(i, i) }
     collisionPairs.foreach { case (w1, w2) =>
       map.insert(w1, w2)
@@ -296,7 +301,32 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
       assert(kv._2.equals(expectedValue))
       count += 1
     }
-    assert(count == 100000 + collisionPairs.size * 2)
+    assert(count === 100000 + collisionPairs.size * 2)
+  }
+
+  test("spilling with many hash collisions") {
+    val conf = new SparkConf(true)
+    conf.set("spark.shuffle.memoryFraction", "0.0001")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", conf)
+
+    val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + 
_, _ + _)
+
+    // Insert 10 copies each of lots of objects whose hash codes are either 0 
or 1. This causes
+    // problems if the map fails to group together the objects with the same 
code (SPARK-2043).
+    for (i <- 1 to 10) {
+      for (j <- 1 to 10000) {
+        map.insert(FixedHashObject(j, j % 2), 1)
+      }
+    }
+
+    val it = map.iterator
+    var count = 0
+    while (it.hasNext) {
+      val kv = it.next()
+      assert(kv._2 === 10)
+      count += 1
+    }
+    assert(count === 10000)
   }
 
   test("spilling with hash collisions using the Int.MaxValue key") {
@@ -317,3 +347,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with 
LocalSparkContext {
     }
   }
 }
+
+/**
+ * A dummy class that always returns the same hash code, to easily test hash 
collisions
+ */
+case class FixedHashObject(val v: Int, val h: Int) extends Serializable {
+  override def hashCode(): Int = h
+}

Reply via email to