add Heap version for GroupByAgg.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e94a029a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e94a029a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e94a029a

Branch: refs/heads/master
Commit: e94a029a795794b94d490223559a3243e5c68950
Parents: bbc6468
Author: DO YUNG YOON <[email protected]>
Authored: Tue Jul 17 00:20:18 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Jul 17 16:49:25 2018 +0900

----------------------------------------------------------------------
 .../s2jobs/wal/BoundedPriorityQueue.scala       |  46 +++++++
 .../process/S2EdgeDataAggregateProcess.scala    |   2 +-
 .../s2jobs/wal/udafs/S2EdgeDataAggregate.scala  | 138 ++++++++++++++++++-
 .../S2EdgeDataAggregateProcessTest.scala        |  42 +++++-
 4 files changed, 217 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
new file mode 100644
index 0000000..ce03169
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala
@@ -0,0 +1,46 @@
+package org.apache.s2graph.s2jobs.wal
+
+import scala.collection.generic.Growable
+import scala.collection.JavaConverters._
+import java.util.{PriorityQueue => JPriorityQueue}
+
+
+class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
+  extends Iterable[A] with Growable[A] with Serializable {
+
+  private val underlying = new JPriorityQueue[A](maxSize, ord)
+
+  override def iterator: Iterator[A] = underlying.iterator.asScala
+
+  override def size: Int = underlying.size
+
+  override def ++=(xs: TraversableOnce[A]): this.type = {
+    xs.foreach { this += _ }
+    this
+  }
+
+  override def +=(elem: A): this.type = {
+    if (size < maxSize) {
+      underlying.offer(elem)
+    } else {
+      maybeReplaceLowest(elem)
+    }
+    this
+  }
+
+  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
+    this += elem1 += elem2 ++= elems
+  }
+
+  override def clear() { underlying.clear() }
+
+  private def maybeReplaceLowest(a: A): Boolean = {
+    val head = underlying.peek()
+    if (head != null && ord.gt(a, head)) {
+      underlying.poll()
+      underlying.offer(a)
+    } else {
+      false
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
index a1d17f1..cdc6c8c 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala
@@ -16,7 +16,7 @@ class S2EdgeDataAggregateProcess(taskConf: TaskConf) extends 
org.apache.s2graph.
     val aggregateColumns = 
taskConf.options.get("aggregateColumns").getOrElse("timestamp,to,label,props").split(",").map(col(_))
     
taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions",
 _))
 
-    val aggregator = new GroupByAgg(maxNumOfEdges)
+    val aggregator = S2EdgeDataAggregate(maxNumOfEdges)
 
     val edges = inputMap(taskConf.inputs.head)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
index 1b12235..dabfd99 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala
@@ -5,6 +5,9 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._
 
+import scala.annotation.tailrec
+import scala.collection.mutable
+
 object S2EdgeDataAggregate {
   type Element = (Long, String, String, String)
 
@@ -33,8 +36,77 @@ object S2EdgeDataAggregate {
 
   val arrayType = ArrayType(elementType = StructType(fields))
 
-  def apply(maxNumOfEdges: Int = 1000): GroupByAgg = {
-    new GroupByAgg(maxNumOfEdges)
+  def apply(maxNumOfEdges: Int = 1000): GroupByAggOptimized = {
+    new GroupByAggOptimized(maxNumOfEdges)
+  }
+
+  def swap[T](array: mutable.Seq[T], i: Int, j: Int) = {
+    val tmp = array(i)
+    array(i) = array(j)
+    array(j) = tmp
+  }
+
+  @tailrec
+  def percolateDown[T](array: mutable.Seq[T], idx: Int)(implicit ordering: 
Ordering[T]): Unit = {
+    val left = 2 * idx + 1
+    val right = 2 * idx + 2
+    var smallest = idx
+
+    if (left < array.size && ordering.compare(array(left), array(smallest)) < 
0) {
+      smallest = left
+    }
+
+    if (right < array.size && ordering.compare(array(right), array(smallest)) 
< 0) {
+      smallest = right
+    }
+
+    if (smallest != idx) {
+      swap(array, idx, smallest)
+      percolateDown(array, smallest)
+    }
+  }
+
+  def percolateUp[T](array: mutable.Seq[T],
+                     idx: Int)(implicit ordering: Ordering[T]): Unit = {
+    var pos = idx
+    var parent = (pos - 1) / 2
+    while (parent >= 0 && ordering.compare(array(pos), array(parent)) < 0) {
+      // swap pos and parent, since a[parent] > array[pos]
+      swap(array, parent, pos)
+      pos = parent
+      parent = (pos - 1) / 2
+    }
+  }
+
+  def addToTopK[T](array: mutable.Seq[T],
+                   size: Int,
+                   newData: T)(implicit ordering: Ordering[T]): mutable.Seq[T] 
= {
+    // use array as minHeap to keep track of topK.
+    // parent = (i -1) / 2
+    // left child = 2 * i + 1
+    // right chiud = 2  * i + 2
+
+    // check if array is already full.
+    if (array.size >= size) {
+      // compare newData to min. newData < array(0)
+      val currentMin = array(0)
+      if (ordering.compare(newData, currentMin) < 0) {
+        // drop newData
+      } else {
+        // delete min
+        array(0) = newData
+        // percolate down
+        percolateDown(array, 0)
+      }
+      array
+    } else {
+      // append new element into seqeunce since there are room left.
+      val newArray = array :+ newData
+      val idx = newArray.size - 1
+      // percolate up last element
+      percolateUp(newArray, idx)
+      newArray
+    }
   }
 
   def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: 
Ordering[T]): Seq[T] = {
@@ -46,7 +118,7 @@ object S2EdgeDataAggregate {
     val arr = new mutable.ArrayBuffer[T](size)
 
     while (idx < size && i < n && j < m) {
-      if (ordering.compare(cur(i), prev(j)) > 0) {
+      if (ordering.compare(cur(i), prev(j)) < 0) {
         arr += cur(i)
         i += 1
       } else {
@@ -68,8 +140,61 @@ object S2EdgeDataAggregate {
   }
 }
 
-class GroupByAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
+class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
+
+  import S2EdgeDataAggregate._
+
+  implicit val ord = rowOrdering
+
+  val arrayType = ArrayType(elementType = StructType(fields))
+
+  type ROWS = mutable.Seq[Row]
+
+  override def inputSchema: StructType = StructType(fields)
+
+  override def bufferSchema: StructType = StructType(Seq(
+    StructField(name = "edges", dataType = arrayType)
+  ))
+
+  override def dataType: DataType = arrayType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, mutable.ArrayBuffer.empty[Row])
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val prev = buffer.getAs[ROWS](0)
+
+    val updated = addToTopK(prev, maxNumOfEdges, input)
+
+    buffer.update(0, updated)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    var prev = buffer1.getAs[ROWS](0)
+    val cur = buffer2.getAs[ROWS](0)
+
+    cur.filter(_ != null).foreach { row =>
+      prev = addToTopK(prev, maxNumOfEdges, row)
+    }
+
+    buffer1.update(0, prev)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    val ls = buffer.getAs[ROWS](0)
+    takeTopK(ls, maxNumOfEdges)
+  }
+
+  private def takeTopK(ls: Seq[Row], k: Int) = {
+    val sorted = ls.sorted
+    if (sorted.size <= k) sorted else sorted.take(k)
+  }
+}
 
+class GroupByAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
   import S2EdgeDataAggregate._
 
   implicit val ord = rowOrderingDesc
@@ -106,7 +231,6 @@ class GroupByAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction
     val sorted = ls.sorted
     if (sorted.size <= k) sorted else sorted.take(k)
   }
-
   /* not optimized */
   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
     val cur = buffer2.getAs[Seq[Row]](0)
@@ -125,7 +249,6 @@ class GroupByAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction
 }
 
 class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFunction {
-
   import S2EdgeDataAggregate._
 
   implicit val ord = rowOrdering
@@ -159,7 +282,8 @@ class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends 
UserDefinedAggregateFun
     val cur = buffer2.getAs[Seq[Row]](0)
     val prev = buffer1.getAs[Seq[Row]](0)
 
-    buffer1.update(0, mergeTwoSeq(cur, prev, maxNumOfEdges))
+    val merged = mergeTwoSeq(cur, prev, maxNumOfEdges)
+    buffer1.update(0, merged)
   }
 
   override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
index 72c8cf5..a397f7e 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala
@@ -2,10 +2,13 @@ package org.apache.s2graph.s2jobs.wal.process
 
 import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.s2graph.s2jobs.wal.WalLog
+import org.apache.s2graph.s2jobs.wal.{BoundedPriorityQueue, WalLog}
 import org.apache.s2graph.s2jobs.wal.udafs._
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
+import scala.collection.mutable
+import scala.util.Random
+
 class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with 
BeforeAndAfterAll with DataFrameSuiteBase {
   val walLogsLs = Seq(
     WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 
1}"""),
@@ -24,11 +27,12 @@ class S2EdgeDataAggregateProcessTest extends FunSuite with 
Matchers with BeforeA
   test("test S2EdgeDataAggregateProcess") {
     import spark.sqlContext.implicits._
 
-    val edges = spark.createDataset((0 until 10).flatMap(ith => 
walLogsLs)).toDF()
+    val edges = spark.createDataset(walLogsLs).toDF()
     val inputMap = Map("edges" -> edges)
     val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = 
Seq("edges"),
       options = Map("maxNumOfEdges" -> "10",
-        "groupByAggClassName" -> "GroupByAgg"))
+        "runOrderBy" -> "false",
+        "groupByAggClassName" -> "GroupByAggOptimized"))
 
     val job = new S2EdgeDataAggregateProcess(taskConf = taskConf)
     val processed = job.execute(spark, inputMap)
@@ -84,4 +88,36 @@ class S2EdgeDataAggregateProcessTest extends FunSuite with 
Matchers with BeforeA
       println(x)
     }
   }
+
+  test("addToTopK test.") {
+    import S2EdgeDataAggregate._
+    val numOfTest = 100
+    val numOfNums = 100
+    val maxNum = 10
+
+    (0 until numOfTest).foreach { testNum =>
+      val maxSize = 1 + Random.nextInt(numOfNums)
+      val pq = new BoundedPriorityQueue[Int](maxSize)
+      val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
+      var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int]
+
+      arr.foreach { i =>
+        pq += i
+        result = addToTopK(result, maxSize, i)
+      }
+      result.toSeq.sorted shouldBe pq.toSeq.sorted
+    }
+
+//    val maxSize = 1 + Random.nextInt(numOfNums)
+//    val pq = new BoundedPriorityQueue[Int](maxSize)
+//    val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum))
+//    val result = mutable.ArrayBuffer.empty[Int]
+//    var lastPos = 0
+//    arr.foreach { i =>
+//      pq += i
+//      addToTopK(result, lastPos, maxSize, i)
+//      lastPos = lastPos + 1
+//    }
+//    result.toSeq.sorted shouldBe pq.toSeq.sorted
+  }
 }
\ No newline at end of file

Reply via email to