add vertices.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/aa8bd35e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/aa8bd35e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/aa8bd35e

Branch: refs/heads/master
Commit: aa8bd35e3a7eb43905b7d0d199140aeb4d579997
Parents: 6488960
Author: DO YUNG YOON <[email protected]>
Authored: Fri Oct 19 13:50:35 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri Oct 19 13:50:35 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/wal/WalLog.scala   | 19 ++++++++++++-------
 .../apache/s2graph/s2jobs/task/ProcessTest.scala |  2 +-
 2 files changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa8bd35e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index 4080045..528f170 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -48,23 +48,28 @@ object WalLogAgg {
   }
 
   def merge(iter: Iterator[WalLogAgg],
-            param: AggregateParam)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
-    val edgeHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
-    val vertexHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
-
-    val validTimestampDuration = param.validTimestampDuration
+            heapSize: Int,
+            validTimestampDuration: Option[Long],
+            sortTopItems: Boolean)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
+    val edgeHeap = new BoundedPriorityQueue[WalLog](heapSize)
+    val vertexHeap = new BoundedPriorityQueue[WalLog](heapSize)
 
     iter.foreach { walLogAgg =>
       addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration)
       addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration)
     }
 
-    val topVertices = if (param.sortTopItems) 
vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
-    val topEdges = if (param.sortTopItems) 
edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray
+    val topVertices = if (sortTopItems) 
vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
+    val topEdges = if (sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) 
else edgeHeap.toArray
 
     topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, 
topEdges))
   }
 
+  def merge(iter: Iterator[WalLogAgg],
+            param: AggregateParam)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
+    merge(iter, param.heapSize, param.validTimestampDuration, 
param.sortTopItems)
+  }
+
   private def filterPropsInner(walLogs: Seq[WalLog],
                           transformers: Seq[Transformer],
                           validFeatureHashKeys: Set[Long]): Seq[WalLog] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa8bd35e/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
index d9bbb5b..5539e43 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/ProcessTest.scala
@@ -26,7 +26,7 @@ class ProcessTest extends FunSuite with DataFrameSuiteBase {
 
   test("SqlProcess execute sql") {
     import spark.implicits._
-
+    
     val inputDF = Seq(
       ("a", "b", "friend"),
       ("a", "c", "friend"),

Reply via email to