add Heap version for GroupByAgg.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e94a029a Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e94a029a Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e94a029a Branch: refs/heads/master Commit: e94a029a795794b94d490223559a3243e5c68950 Parents: bbc6468 Author: DO YUNG YOON <[email protected]> Authored: Tue Jul 17 00:20:18 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Jul 17 16:49:25 2018 +0900 ---------------------------------------------------------------------- .../s2jobs/wal/BoundedPriorityQueue.scala | 46 +++++++ .../process/S2EdgeDataAggregateProcess.scala | 2 +- .../s2jobs/wal/udafs/S2EdgeDataAggregate.scala | 138 ++++++++++++++++++- .../S2EdgeDataAggregateProcessTest.scala | 42 +++++- 4 files changed, 217 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala new file mode 100644 index 0000000..ce03169 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/BoundedPriorityQueue.scala @@ -0,0 +1,46 @@ +package org.apache.s2graph.s2jobs.wal + +import scala.collection.generic.Growable +import scala.collection.JavaConverters._ +import java.util.{PriorityQueue => JPriorityQueue} + + +class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A]) + extends Iterable[A] with Growable[A] with Serializable { + + private val underlying = new JPriorityQueue[A](maxSize, ord) + + override def iterator: Iterator[A] = underlying.iterator.asScala + + override def size: Int = underlying.size + + override def ++=(xs: TraversableOnce[A]): this.type = { + xs.foreach { this += _ } + this + } + + override def +=(elem: A): this.type = { + if (size < maxSize) { + underlying.offer(elem) + } else { + maybeReplaceLowest(elem) + } + this + } + + override def +=(elem1: A, elem2: A, elems: A*): this.type = { + this += elem1 += elem2 ++= elems + } + + override def clear() { underlying.clear() } + + private def maybeReplaceLowest(a: A): Boolean = { + val head = underlying.peek() + if (head != null && ord.gt(a, head)) { + underlying.poll() + underlying.offer(a) + } else { + false + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala index a1d17f1..cdc6c8c 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcess.scala @@ -16,7 +16,7 @@ class S2EdgeDataAggregateProcess(taskConf: TaskConf) extends org.apache.s2graph. val aggregateColumns = taskConf.options.get("aggregateColumns").getOrElse("timestamp,to,label,props").split(",").map(col(_)) taskConf.options.get("parallelism").map(ss.sqlContext.setConf("spark.sql.shuffle.partitions", _)) - val aggregator = new GroupByAgg(maxNumOfEdges) + val aggregator = S2EdgeDataAggregate(maxNumOfEdges) val edges = inputMap(taskConf.inputs.head) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala index 1b12235..dabfd99 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/udafs/S2EdgeDataAggregate.scala @@ -5,6 +5,9 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ +import scala.annotation.tailrec +import scala.collection.mutable + object S2EdgeDataAggregate { type Element = (Long, String, String, String) @@ -33,8 +36,77 @@ object S2EdgeDataAggregate { val arrayType = ArrayType(elementType = StructType(fields)) - def apply(maxNumOfEdges: Int = 1000): GroupByAgg = { - new GroupByAgg(maxNumOfEdges) + def apply(maxNumOfEdges: Int = 1000): GroupByAggOptimized = { + new GroupByAggOptimized(maxNumOfEdges) + } + + def swap[T](array: mutable.Seq[T], i: Int, j: Int) = { + val tmp = array(i) + array(i) = array(j) + array(j) = tmp + } + + @tailrec + def percolateDown[T](array: mutable.Seq[T], idx: Int)(implicit ordering: Ordering[T]): Unit = { + val left = 2 * idx + 1 + val right = 2 * idx + 2 + var smallest = idx + + if (left < array.size && ordering.compare(array(left), array(smallest)) < 0) { + smallest = left + } + + if (right < array.size && ordering.compare(array(right), array(smallest)) < 0) { + smallest = right + } + + if (smallest != idx) { + swap(array, idx, smallest) + percolateDown(array, smallest) + } + } + + def percolateUp[T](array: mutable.Seq[T], + idx: Int)(implicit ordering: Ordering[T]): Unit = { + var pos = idx + var parent = (pos - 1) / 2 + while (parent >= 0 && ordering.compare(array(pos), array(parent)) < 0) { + // swap pos and parent, since a[parent] > array[pos] + swap(array, parent, pos) + pos = parent + parent = (pos - 1) / 2 + } + } + + def addToTopK[T](array: mutable.Seq[T], + size: Int, + newData: T)(implicit ordering: Ordering[T]): mutable.Seq[T] = { + // use array as minHeap to keep track of topK. + // parent = (i -1) / 2 + // left child = 2 * i + 1 + // right chiud = 2 * i + 2 + + // check if array is already full. + if (array.size >= size) { + // compare newData to min. newData < array(0) + val currentMin = array(0) + if (ordering.compare(newData, currentMin) < 0) { + // drop newData + } else { + // delete min + array(0) = newData + // percolate down + percolateDown(array, 0) + } + array + } else { + // append new element into seqeunce since there are room left. + val newArray = array :+ newData + val idx = newArray.size - 1 + // percolate up last element + percolateUp(newArray, idx) + newArray + } } def mergeTwoSeq[T](prev: Seq[T], cur: Seq[T], size: Int)(implicit ordering: Ordering[T]): Seq[T] = { @@ -46,7 +118,7 @@ object S2EdgeDataAggregate { val arr = new mutable.ArrayBuffer[T](size) while (idx < size && i < n && j < m) { - if (ordering.compare(cur(i), prev(j)) > 0) { + if (ordering.compare(cur(i), prev(j)) < 0) { arr += cur(i) i += 1 } else { @@ -68,8 +140,61 @@ object S2EdgeDataAggregate { } } -class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { +class GroupByAggOptimized(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { + + import S2EdgeDataAggregate._ + + implicit val ord = rowOrdering + + val arrayType = ArrayType(elementType = StructType(fields)) + + type ROWS = mutable.Seq[Row] + + override def inputSchema: StructType = StructType(fields) + + override def bufferSchema: StructType = StructType(Seq( + StructField(name = "edges", dataType = arrayType) + )) + + override def dataType: DataType = arrayType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = { + buffer.update(0, mutable.ArrayBuffer.empty[Row]) + } + + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val prev = buffer.getAs[ROWS](0) + + val updated = addToTopK(prev, maxNumOfEdges, input) + + buffer.update(0, updated) + } + + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + var prev = buffer1.getAs[ROWS](0) + val cur = buffer2.getAs[ROWS](0) + + cur.filter(_ != null).foreach { row => + prev = addToTopK(prev, maxNumOfEdges, row) + } + + buffer1.update(0, prev) + } + + override def evaluate(buffer: Row): Any = { + val ls = buffer.getAs[ROWS](0) + takeTopK(ls, maxNumOfEdges) + } + + private def takeTopK(ls: Seq[Row], k: Int) = { + val sorted = ls.sorted + if (sorted.size <= k) sorted else sorted.take(k) + } +} +class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { import S2EdgeDataAggregate._ implicit val ord = rowOrderingDesc @@ -106,7 +231,6 @@ class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction val sorted = ls.sorted if (sorted.size <= k) sorted else sorted.take(k) } - /* not optimized */ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val cur = buffer2.getAs[Seq[Row]](0) @@ -125,7 +249,6 @@ class GroupByAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction } class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFunction { - import S2EdgeDataAggregate._ implicit val ord = rowOrdering @@ -159,7 +282,8 @@ class GroupByArrayAgg(maxNumOfEdges: Int = 1000) extends UserDefinedAggregateFun val cur = buffer2.getAs[Seq[Row]](0) val prev = buffer1.getAs[Seq[Row]](0) - buffer1.update(0, mergeTwoSeq(cur, prev, maxNumOfEdges)) + val merged = mergeTwoSeq(cur, prev, maxNumOfEdges) + buffer1.update(0, merged) } override def evaluate(buffer: Row): Any = buffer.getAs[Seq[Row]](0) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e94a029a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala index 72c8cf5..a397f7e 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/wal/process/S2EdgeDataAggregateProcessTest.scala @@ -2,10 +2,13 @@ package org.apache.s2graph.s2jobs.wal.process import com.holdenkarau.spark.testing.DataFrameSuiteBase import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.s2graph.s2jobs.wal.WalLog +import org.apache.s2graph.s2jobs.wal.{BoundedPriorityQueue, WalLog} import org.apache.s2graph.s2jobs.wal.udafs._ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import scala.collection.mutable +import scala.util.Random + class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeAndAfterAll with DataFrameSuiteBase { val walLogsLs = Seq( WalLog(1L, "insert", "edge", "a", "b", "s2graph", "friends", """{"name": 1}"""), @@ -24,11 +27,12 @@ class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeA test("test S2EdgeDataAggregateProcess") { import spark.sqlContext.implicits._ - val edges = spark.createDataset((0 until 10).flatMap(ith => walLogsLs)).toDF() + val edges = spark.createDataset(walLogsLs).toDF() val inputMap = Map("edges" -> edges) val taskConf = new TaskConf(name = "test", `type` = "agg", inputs = Seq("edges"), options = Map("maxNumOfEdges" -> "10", - "groupByAggClassName" -> "GroupByAgg")) + "runOrderBy" -> "false", + "groupByAggClassName" -> "GroupByAggOptimized")) val job = new S2EdgeDataAggregateProcess(taskConf = taskConf) val processed = job.execute(spark, inputMap) @@ -84,4 +88,36 @@ class S2EdgeDataAggregateProcessTest extends FunSuite with Matchers with BeforeA println(x) } } + + test("addToTopK test.") { + import S2EdgeDataAggregate._ + val numOfTest = 100 + val numOfNums = 100 + val maxNum = 10 + + (0 until numOfTest).foreach { testNum => + val maxSize = 1 + Random.nextInt(numOfNums) + val pq = new BoundedPriorityQueue[Int](maxSize) + val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum)) + var result: mutable.Seq[Int] = mutable.ArrayBuffer.empty[Int] + + arr.foreach { i => + pq += i + result = addToTopK(result, maxSize, i) + } + result.toSeq.sorted shouldBe pq.toSeq.sorted + } + +// val maxSize = 1 + Random.nextInt(numOfNums) +// val pq = new BoundedPriorityQueue[Int](maxSize) +// val arr = (0 until numOfNums).map(x => Random.nextInt(maxNum)) +// val result = mutable.ArrayBuffer.empty[Int] +// var lastPos = 0 +// arr.foreach { i => +// pq += i +// addToTopK(result, lastPos, maxSize, i) +// lastPos = lastPos + 1 +// } +// result.toSeq.sorted shouldBe pq.toSeq.sorted + } } \ No newline at end of file
