Repository: spark
Updated Branches:
  refs/heads/master 17275fa99 -> 8df584b02


[SPARK-11982] [SQL] improve performance of cartesian product

This PR improve the performance of CartesianProduct by caching the result of 
right plan.

After this patch, the query time of TPC-DS Q65 go down to 4 seconds from 28 
minutes (420X faster).

cc nongli

Author: Davies Liu <[email protected]>

Closes #9969 from davies/improve_cartesian.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8df584b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8df584b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8df584b0

Branch: refs/heads/master
Commit: 8df584b0200402d8b2ce0a8de24f7a760ced8655
Parents: 17275fa
Author: Davies Liu <[email protected]>
Authored: Mon Nov 30 11:54:18 2015 -0800
Committer: Davies Liu <[email protected]>
Committed: Mon Nov 30 11:54:18 2015 -0800

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       | 63 ++++++++++++++++
 .../unsafe/sort/UnsafeInMemorySorter.java       |  7 ++
 .../sql/execution/joins/CartesianProduct.scala  | 76 +++++++++++++++++---
 .../sql/execution/metric/SQLMetricsSuite.scala  |  2 +-
 4 files changed, 139 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8df584b0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 2e40312..5a97f4f 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -21,6 +21,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.Queue;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -521,4 +522,66 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       return upstream.getKeyPrefix();
     }
   }
+
+  /**
+   * Returns a iterator, which will return the rows in the order as inserted.
+   *
+   * It is the caller's responsibility to call `cleanupResources()`
+   * after consuming this iterator.
+   */
+  public UnsafeSorterIterator getIterator() throws IOException {
+    if (spillWriters.isEmpty()) {
+      assert(inMemSorter != null);
+      return inMemSorter.getIterator();
+    } else {
+      LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
+      for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
+        queue.add(spillWriter.getReader(blockManager));
+      }
+      if (inMemSorter != null) {
+        queue.add(inMemSorter.getIterator());
+      }
+      return new ChainedIterator(queue);
+    }
+  }
+
+  /**
+   * Chain multiple UnsafeSorterIterator together as single one.
+   */
+  class ChainedIterator extends UnsafeSorterIterator {
+
+    private final Queue<UnsafeSorterIterator> iterators;
+    private UnsafeSorterIterator current;
+
+    public ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
+      assert iterators.size() > 0;
+      this.iterators = iterators;
+      this.current = iterators.remove();
+    }
+
+    @Override
+    public boolean hasNext() {
+      while (!current.hasNext() && !iterators.isEmpty()) {
+        current = iterators.remove();
+      }
+      return current.hasNext();
+    }
+
+    @Override
+    public void loadNext() throws IOException {
+      current.loadNext();
+    }
+
+    @Override
+    public Object getBaseObject() { return current.getBaseObject(); }
+
+    @Override
+    public long getBaseOffset() { return current.getBaseOffset(); }
+
+    @Override
+    public int getRecordLength() { return current.getRecordLength(); }
+
+    @Override
+    public long getKeyPrefix() { return current.getKeyPrefix(); }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8df584b0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index dce1f15..c91e88f 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -226,4 +226,11 @@ public final class UnsafeInMemorySorter {
     sorter.sort(array, 0, pos / 2, sortComparator);
     return new SortedIterator(pos / 2);
   }
+
+  /**
+   * Returns an iterator over record pointers in original order (inserted).
+   */
+  public SortedIterator getIterator() {
+    return new SortedIterator(pos / 2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8df584b0/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index f467519..fa2bc76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -17,16 +17,75 @@
 
 package org.apache.spark.sql.execution.joins
 
-import org.apache.spark.rdd.RDD
+import org.apache.spark._
+import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
 import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.CompletionIterator
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+
+
+/**
+  * An optimized CartesianRDD for UnsafeRow, which will cache the rows from 
second child RDD,
+  * will be much faster than building the right partition for every row in 
left RDD, it also
+  * materialize the right RDD (in case of the right RDD is nondeterministic).
+  */
+private[spark]
+class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], 
numFieldsOfRight: Int)
+  extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+    // We will not sort the rows, so prefixComparator and recordComparator are 
null.
+    val sorter = UnsafeExternalSorter.create(
+      context.taskMemoryManager(),
+      SparkEnv.get.blockManager,
+      context,
+      null,
+      null,
+      1024,
+      SparkEnv.get.memoryManager.pageSizeBytes)
+
+    val partition = split.asInstanceOf[CartesianPartition]
+    for (y <- rdd2.iterator(partition.s2, context)) {
+      sorter.insertRecord(y.getBaseObject, y.getBaseOffset, y.getSizeInBytes, 
0)
+    }
+
+    // Create an iterator from sorter and wrapper it as Iterator[UnsafeRow]
+    def createIter(): Iterator[UnsafeRow] = {
+      val iter = sorter.getIterator
+      val unsafeRow = new UnsafeRow
+      new Iterator[UnsafeRow] {
+        override def hasNext: Boolean = {
+          iter.hasNext
+        }
+        override def next(): UnsafeRow = {
+          iter.loadNext()
+          unsafeRow.pointTo(iter.getBaseObject, iter.getBaseOffset, 
numFieldsOfRight,
+            iter.getRecordLength)
+          unsafeRow
+        }
+      }
+    }
+
+    val resultIter =
+      for (x <- rdd1.iterator(partition.s1, context);
+           y <- createIter()) yield (x, y)
+    CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, 
UnsafeRow)]](
+      resultIter, sorter.cleanupResources)
+  }
+}
 
 
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends 
BinaryNode {
   override def output: Seq[Attribute] = left.output ++ right.output
 
+  override def canProcessSafeRows: Boolean = false
+  override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = true
+
   override private[sql] lazy val metrics = Map(
     "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left 
rows"),
     "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
right rows"),
@@ -39,18 +98,19 @@ case class CartesianProduct(left: SparkPlan, right: 
SparkPlan) extends BinaryNod
 
     val leftResults = left.execute().map { row =>
       numLeftRows += 1
-      row.copy()
+      row.asInstanceOf[UnsafeRow]
     }
     val rightResults = right.execute().map { row =>
       numRightRows += 1
-      row.copy()
+      row.asInstanceOf[UnsafeRow]
     }
 
-    leftResults.cartesian(rightResults).mapPartitionsInternal { iter =>
-      val joinedRow = new JoinedRow
+    val pair = new UnsafeCartesianRDD(leftResults, rightResults, 
right.output.size)
+    pair.mapPartitionsInternal { iter =>
+      val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
       iter.map { r =>
         numOutputRows += 1
-        joinedRow(r._1, r._2)
+        joiner.join(r._1, r._2)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8df584b0/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index ebfa1ea..4f2cad1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -317,7 +317,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
       testSparkPlanMetrics(df, 1, Map(
         1L -> ("CartesianProduct", Map(
           "number of left rows" -> 12L, // left needs to be scanned twice
-          "number of right rows" -> 12L, // right is read 6 times
+          "number of right rows" -> 4L, // right is read twice
           "number of output rows" -> 12L)))
       )
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to