Repository: spark
Updated Branches:
  refs/heads/branch-1.1 779d1eb26 -> f66f260bb


[SQL] [SPARK-2826] Reduce the memory copy while building the hashmap for 
HashOuterJoin

This is a follow up for #1147 , this PR will improve the performance about 10% 
- 15% in my local tests.
```
Before:
LeftOuterJoin: took 16750 ms ([3000000] records)
LeftOuterJoin: took 15179 ms ([3000000] records)
RightOuterJoin: took 15515 ms ([3000000] records)
RightOuterJoin: took 15276 ms ([3000000] records)
FullOuterJoin: took 19150 ms ([6000000] records)
FullOuterJoin: took 18935 ms ([6000000] records)

After:
LeftOuterJoin: took 15218 ms ([3000000] records)
LeftOuterJoin: took 13503 ms ([3000000] records)
RightOuterJoin: took 13663 ms ([3000000] records)
RightOuterJoin: took 14025 ms ([3000000] records)
FullOuterJoin: took 16624 ms ([6000000] records)
FullOuterJoin: took 16578 ms ([6000000] records)
```

Besides the performance improvement, I also do some clean up as suggested in 
#1147

Author: Cheng Hao <[email protected]>

Closes #1765 from chenghao-intel/hash_outer_join_fixing and squashes the 
following commits:

ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap

(cherry picked from commit 5d54d71ddbac1fbb26925a8c9138bbb8c0e81db8)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f66f260b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f66f260b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f66f260b

Branch: refs/heads/branch-1.1
Commit: f66f260bbede1eb4e4133918812700baa252fba8
Parents: 779d1eb
Author: Cheng Hao <[email protected]>
Authored: Mon Aug 11 20:45:14 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Mon Aug 11 20:45:24 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/joins.scala  | 54 ++++++++++----------
 1 file changed, 28 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f66f260b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index ea075f8..c86811e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.{HashMap => JavaHashMap}
+
 import scala.collection.mutable.{ArrayBuffer, BitSet}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent._
@@ -137,14 +139,6 @@ trait HashJoin {
 }
 
 /**
- * Constant Value for Binary Join Node
- */
-object HashOuterJoin {
-  val DUMMY_LIST = Seq[Row](null)
-  val EMPTY_LIST = Seq[Row]()
-}
-
-/**
  * :: DeveloperApi ::
  * Performs a hash based outer join for two child relations by shuffling the 
data using 
  * the join keys. This operator requires loading the associated partition in 
both side into memory.
@@ -181,6 +175,9 @@ case class HashOuterJoin(
     }
   }
 
+  @transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
+  @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]
+
   // TODO we need to rewrite all of the iterators with our own implementation 
instead of the Scala
   // iterator for performance purpose. 
 
@@ -199,8 +196,8 @@ case class HashOuterJoin(
         joinedRow.copy
       } else {
         Nil
-      }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
-        // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to 
add additional row,
+      }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
+        // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional 
row,
         // as we don't know whether we need to append it until finish 
iterating all of the 
         // records in right side.
         // If we didn't get any proper row, then append a single row with 
empty right
@@ -224,8 +221,8 @@ case class HashOuterJoin(
         joinedRow.copy
       } else {
         Nil
-      }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
-        // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to 
add additional row,
+      }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
+        // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional 
row,
         // as we don't know whether we need to append it until finish 
iterating all of the 
         // records in left side.
         // If we didn't get any proper row, then append a single row with 
empty left.
@@ -259,10 +256,10 @@ case class HashOuterJoin(
             rightMatchedSet.add(idx)
             joinedRow.copy
           }
-        } ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
+        } ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
           // 2. For those unmatched records in left, append additional records 
with empty right.
 
-          // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to 
add additional row,
+          // DUMMY_LIST.filter(_ => !matched) is a tricky way to add 
additional row,
           // as we don't know whether we need to append it until finish 
iterating all 
           // of the records in right side.
           // If we didn't get any proper row, then append a single row with 
empty right.
@@ -287,18 +284,22 @@ case class HashOuterJoin(
   }
 
   private[this] def buildHashTable(
-      iter: Iterator[Row], keyGenerator: Projection): Map[Row, 
ArrayBuffer[Row]] = {
-    // TODO: Use Spark's HashMap implementation.
-    val hashTable = scala.collection.mutable.Map[Row, ArrayBuffer[Row]]()
+      iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, 
ArrayBuffer[Row]] = {
+    val hashTable = new JavaHashMap[Row, ArrayBuffer[Row]]()
     while (iter.hasNext) {
       val currentRow = iter.next()
       val rowKey = keyGenerator(currentRow)
 
-      val existingMatchList = hashTable.getOrElseUpdate(rowKey, {new 
ArrayBuffer[Row]()})
+      var existingMatchList = hashTable.get(rowKey)
+      if (existingMatchList == null) {
+        existingMatchList = new ArrayBuffer[Row]()
+        hashTable.put(rowKey, existingMatchList)
+      }
+
       existingMatchList += currentRow.copy()
     }
-    
-    hashTable.toMap[Row, ArrayBuffer[Row]]
+
+    hashTable
   }
 
   def execute() = {
@@ -309,21 +310,22 @@ case class HashOuterJoin(
       // Build HashMap for current partition in right relation
       val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, 
right.output))
 
+      import scala.collection.JavaConversions._
       val boundCondition = 
         condition.map(newPredicate(_, left.output ++ 
right.output)).getOrElse((row: Row) => true)
       joinType match {
         case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
-          leftOuterIterator(key, leftHashTable.getOrElse(key, 
HashOuterJoin.EMPTY_LIST), 
-            rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+          leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST), 
+            rightHashTable.getOrElse(key, EMPTY_LIST))
         }
         case RightOuter => rightHashTable.keysIterator.flatMap { key =>
-          rightOuterIterator(key, leftHashTable.getOrElse(key, 
HashOuterJoin.EMPTY_LIST), 
-            rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+          rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST), 
+            rightHashTable.getOrElse(key, EMPTY_LIST))
         }
         case FullOuter => (leftHashTable.keySet ++ 
rightHashTable.keySet).iterator.flatMap { key =>
           fullOuterIterator(key, 
-            leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST), 
-            rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+            leftHashTable.getOrElse(key, EMPTY_LIST), 
+            rightHashTable.getOrElse(key, EMPTY_LIST))
         }
         case x => throw new Exception(s"HashOuterJoin should not take $x as 
the JoinType")
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to