add vertices.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/aa8bd35e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/aa8bd35e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/aa8bd35e Branch: refs/heads/master Commit: aa8bd35e3a7eb43905b7d0d199140aeb4d579997 Parents: 6488960 Author: DO YUNG YOON <[email protected]> Authored: Fri Oct 19 13:50:35 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Oct 19 13:50:35 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 19 ++++++++++++------- .../apache/s2graph/s2jobs/task/ProcessTest.scala | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa8bd35e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index 4080045..528f170 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -48,23 +48,28 @@ object WalLogAgg { } def merge(iter: Iterator[WalLogAgg], - param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { - val edgeHeap = new BoundedPriorityQueue[WalLog](param.heapSize) - val vertexHeap = new BoundedPriorityQueue[WalLog](param.heapSize) - - val validTimestampDuration = param.validTimestampDuration + heapSize: Int, + validTimestampDuration: Option[Long], + sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { + val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize) + val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize) iter.foreach { walLogAgg => addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration) addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration) } - val topVertices = if (param.sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray - val topEdges = if (param.sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray + val topVertices = if (sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray + val topEdges = if (sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges)) } + def merge(iter: Iterator[WalLogAgg], + param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { + merge(iter, param.heapSize, param.validTimestampDuration, param.sortTopItems) + } + private def filterPropsInner(walLogs: Seq[WalLog], transformers: Seq[Transformer], validFeatureHashKeys: Set[Long]): Seq[WalLog] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa8bd35e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala index d9bbb5b..5539e43 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala @@ -26,7 +26,7 @@ class ProcessTest extends FunSuite with DataFrameSuiteBase { test("SqlProcess execute sql") { import spark.implicits._ - + val inputDF = Seq( ("a", "b", "friend"), ("a", "c", "friend"),
