Repository: spark
Updated Branches:
  refs/heads/branch-1.2 27d9f13af -> a4ae7c8b5


[SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark

This patch just replaces a native quick sorter with Sorter(TimSort) in Spark.
It could get performance gains by ~8% in my quick experiments.

Author: Takeshi Yamamuro <linguin....@gmail.com>

Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the 
following commits:

8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import
3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with 
Sorter(TimSort) in Spark

(cherry picked from commit 2e6b736b0e6e5920d0523533c87832a53211db42)
Signed-off-by: Ankur Dave <ankurd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4ae7c8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4ae7c8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4ae7c8b

Branch: refs/heads/branch-1.2
Commit: a4ae7c8b533b3998484879439c0982170c3c38a7
Parents: 27d9f13
Author: Takeshi Yamamuro <linguin....@gmail.com>
Authored: Sun Dec 7 19:36:08 2014 -0800
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Sun Dec 7 19:37:32 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Edge.scala    | 30 +++++++++++++++
 .../graphx/impl/EdgePartitionBuilder.scala      | 39 +++++++++++++++++---
 2 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a4ae7c8b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 7e842ec..ecc37dc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.graphx
 
+import org.apache.spark.util.collection.SortDataFormat
+
 /**
  * A single directed edge consisting of a source id, target id,
  * and the data associated with the edge.
@@ -65,4 +67,32 @@ object Edge {
       else 1
     }
   }
+
+  private[graphx] def edgeArraySortDataFormat[ED] = new 
SortDataFormat[Edge[ED], Array[Edge[ED]]] {
+    override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = {
+      data(pos)
+    }
+
+    override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = {
+      val tmp = data(pos0)
+      data(pos0) = data(pos1)
+      data(pos1) = tmp
+    }
+
+    override def copyElement(
+        src: Array[Edge[ED]], srcPos: Int,
+        dst: Array[Edge[ED]], dstPos: Int) {
+      dst(dstPos) = src(srcPos)
+    }
+
+    override def copyRange(
+        src: Array[Edge[ED]], srcPos: Int,
+        dst: Array[Edge[ED]], dstPos: Int, length: Int) {
+      System.arraycopy(src, srcPos, dst, dstPos, length)
+    }
+
+    override def allocate(length: Int): Array[Edge[ED]] = {
+      new Array[Edge[ED]](length)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a4ae7c8b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index b0cb0fe..409cf60 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.graphx.impl
 
 import scala.reflect.ClassTag
-import scala.util.Sorting
-
-import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
 
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.{SortDataFormat, Sorter, 
PrimitiveVector}
 
 /** Constructs an EdgePartition from scratch. */
 private[graphx]
@@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) 
ED: ClassTag, VD: Cla
 
   def toEdgePartition: EdgePartition[ED, VD] = {
     val edgeArray = edges.trim().array
-    Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
+    new Sorter(Edge.edgeArraySortDataFormat[ED])
+      .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
     val localSrcIds = new Array[Int](edgeArray.size)
     val localDstIds = new Array[Int](edgeArray.size)
     val data = new Array[ED](edgeArray.size)
@@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[
 
   def toEdgePartition: EdgePartition[ED, VD] = {
     val edgeArray = edges.trim().array
-    Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering)
+    new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED])
+      .sort(edgeArray, 0, edgeArray.length, 
EdgeWithLocalIds.lexicographicOrdering)
     val localSrcIds = new Array[Int](edgeArray.size)
     val localDstIds = new Array[Int](edgeArray.size)
     val data = new Array[ED](edgeArray.size)
@@ -140,4 +140,33 @@ private[impl] object EdgeWithLocalIds {
     }
   }
 
+  private[graphx] def edgeArraySortDataFormat[ED]
+      = new SortDataFormat[EdgeWithLocalIds[ED], Array[EdgeWithLocalIds[ED]]] {
+    override def getKey(
+        data: Array[EdgeWithLocalIds[ED]], pos: Int): EdgeWithLocalIds[ED] = {
+      data(pos)
+    }
+
+    override def swap(data: Array[EdgeWithLocalIds[ED]], pos0: Int, pos1: 
Int): Unit = {
+      val tmp = data(pos0)
+      data(pos0) = data(pos1)
+      data(pos1) = tmp
+    }
+
+    override def copyElement(
+        src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
+        dst: Array[EdgeWithLocalIds[ED]], dstPos: Int) {
+      dst(dstPos) = src(srcPos)
+    }
+
+    override def copyRange(
+        src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
+        dst: Array[EdgeWithLocalIds[ED]], dstPos: Int, length: Int) {
+      System.arraycopy(src, srcPos, dst, dstPos, length)
+    }
+
+    override def allocate(length: Int): Array[EdgeWithLocalIds[ED]] = {
+      new Array[EdgeWithLocalIds[ED]](length)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to