refactor WalLogAgg class.
  - separate edges and vertices.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/64889605
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/64889605
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/64889605

Branch: refs/heads/master
Commit: 64889605d874af2dc71f1ca8b64a6a17ae386e4e
Parents: 651ca2c
Author: DO YUNG YOON <steams...@apache.org>
Authored: Thu Oct 11 11:00:13 2018 +0900
Committer: DO YUNG YOON <steams...@apache.org>
Committed: Thu Oct 11 11:00:13 2018 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/s2jobs/Job.scala   |  3 +-
 .../org/apache/s2graph/s2jobs/wal/WalLog.scala  | 72 ++++++++++++++------
 .../wal/process/WalLogAggregateProcess.scala    |  4 +-
 .../wal/process/params/AggregateParam.scala     |  7 +-
 4 files changed, 58 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 8f21bc2..6d9f509 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -63,8 +63,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends 
Serializable with Log
     val dfKeys = dfMap.keySet
 
     processes.filter{ p =>
-        var existAllInput = true
-        p.conf.inputs.foreach { input => existAllInput = dfKeys(input) }
+        val existAllInput = p.conf.inputs.forall { input => dfKeys(input) }
         !dfKeys(p.conf.name) && existAllInput
     }
     .map { p =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index 9617ca1..4080045 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,7 +1,7 @@
 package org.apache.s2graph.s2jobs.wal
 
 import com.google.common.hash.Hashing
-import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.{GraphUtil, JSONParser}
 import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
 import org.apache.s2graph.s2jobs.wal.transformer.Transformer
 import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
@@ -12,10 +12,18 @@ import play.api.libs.json.{JsObject, Json}
 import scala.util.Try
 
 object WalLogAgg {
-  val outputColumns = Seq("from", "logs", "maxTs", "minTs")
+  val outputColumns = Seq("from", "vertices", "edges")
+
+  def isEdge(walLog: WalLog): Boolean = {
+    walLog.elem == "edge" || walLog.elem == "e"
+  }
 
   def apply(walLog: WalLog): WalLogAgg = {
-    new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
+    val (vertices, edges) =
+      if (isEdge(walLog)) (Nil, Seq(walLog))
+      else (Seq(walLog), Nil)
+
+    new WalLogAgg(walLog.from, vertices, edges)
   }
 
   def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, 
dimVal.value)
@@ -24,29 +32,43 @@ object WalLogAgg {
     Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
   }
 
-  def merge(iter: Iterator[WalLogAgg],
-            param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
-    val heap = new BoundedPriorityQueue[WalLog](param.heapSize)
-    var minTs = Long.MaxValue
-    var maxTs = Long.MinValue
+  private def addToHeap(iter: Seq[WalLog],
+                        heap: BoundedPriorityQueue[WalLog],
+                        validTimestampDuration: Option[Long]): Unit = {
+    val now = System.currentTimeMillis()
 
-    iter.foreach { walLogAgg =>
-      minTs = Math.min(walLogAgg.minTs, minTs)
-      maxTs = Math.max(walLogAgg.maxTs, maxTs)
+    iter.foreach { walLog =>
+      val ts = walLog.timestamp
+      val isValid = validTimestampDuration.map(d => now - ts < 
d).getOrElse(true)
 
-      walLogAgg.logs.foreach { walLog =>
+      if (isValid) {
         heap += walLog
       }
     }
-    val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) 
else heap.toArray
+  }
+
+  def merge(iter: Iterator[WalLogAgg],
+            param: AggregateParam)(implicit ord: Ordering[WalLog]): 
Option[WalLogAgg] = {
+    val edgeHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
+    val vertexHeap = new BoundedPriorityQueue[WalLog](param.heapSize)
+
+    val validTimestampDuration = param.validTimestampDuration
 
-    WalLogAgg(topItems.head.from, topItems, maxTs, minTs)
+    iter.foreach { walLogAgg =>
+      addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration)
+      addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration)
+    }
+
+    val topVertices = if (param.sortTopItems) 
vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray
+    val topEdges = if (param.sortTopItems) 
edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray
+
+    topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, 
topEdges))
   }
 
-  def filterProps(walLogAgg: WalLogAgg,
-                  transformers: Seq[Transformer],
-                  validFeatureHashKeys: Set[Long]) = {
-    val filtered = walLogAgg.logs.map { walLog =>
+  private def filterPropsInner(walLogs: Seq[WalLog],
+                          transformers: Seq[Transformer],
+                          validFeatureHashKeys: Set[Long]): Seq[WalLog] = {
+    walLogs.map { walLog =>
       val fields = walLog.propsJson.fields.filter { case (propKey, propValue) 
=>
         val filtered = transformers.flatMap { transformer =>
           transformer.toDimValLs(walLog, propKey, 
JSONParser.jsValueToString(propValue)).filter(dimVal => 
validFeatureHashKeys(toFeatureHash(dimVal)))
@@ -56,8 +78,15 @@ object WalLogAgg {
 
       walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString)
     }
+  }
+
+  def filterProps(walLogAgg: WalLogAgg,
+                        transformers: Seq[Transformer],
+                        validFeatureHashKeys: Set[Long]) = {
+    val filteredVertices = filterPropsInner(walLogAgg.vertices, transformers, 
validFeatureHashKeys)
+    val filteredEdges = filterPropsInner(walLogAgg.edges, transformers, 
validFeatureHashKeys)
 
-    walLogAgg.copy(logs = filtered)
+    walLogAgg.copy(vertices = filteredVertices, edges = filteredEdges)
   }
 }
 
@@ -88,9 +117,8 @@ object DimVal {
 case class DimVal(dim: String, value: String)
 
 case class WalLogAgg(from: String,
-                     logs: Seq[WalLog],
-                     maxTs: Long,
-                     minTs: Long)
+                     vertices: Seq[WalLog],
+                     edges: Seq[WalLog])
 
 case class WalLog(timestamp: Long,
                   operation: String,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
index ee4debd..aebb1cc 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala
@@ -10,7 +10,7 @@ object WalLogAggregateProcess {
                 dataset: Dataset[WalLogAgg],
                 aggregateParam: AggregateParam)(implicit ord: 
Ordering[WalLog]) = {
     import ss.implicits._
-    dataset.groupByKey(_.from).mapGroups { case (_, iter) =>
+    dataset.groupByKey(_.from).flatMapGroups { case (_, iter) =>
       WalLogAgg.merge(iter, aggregateParam)
     }.toDF(WalLogAgg.outputColumns: _*)
   }
@@ -20,7 +20,7 @@ object WalLogAggregateProcess {
                    aggregateParam: AggregateParam)(implicit ord: 
Ordering[WalLog]): DataFrame = {
     import ss.implicits._
 
-    dataset.groupByKey(walLog => walLog.from).mapGroups { case (key, iter) =>
+    dataset.groupByKey(walLog => walLog.from).flatMapGroups { case (key, iter) 
=>
       WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam)
     }.toDF(WalLogAgg.outputColumns: _*)
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
index 4cb1377..ecc12e3 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala
@@ -14,12 +14,14 @@ object AggregateParam {
     val arrayType = 
taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(defaultIsArrayType)
     val sortTopItems = 
taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems)
     val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
+    val validTimestampDuration = 
taskConf.options.get("validTimestampDuration").map(_.toLong).getOrElse(Long.MaxValue)
 
     new AggregateParam(groupByKeys = groupByKeys,
       topK = Option(maxNumOfEdges),
       isArrayType = Option(arrayType),
       shouldSortTopItems = Option(sortTopItems),
-      numOfPartitions = numOfPartitions
+      numOfPartitions = numOfPartitions,
+      validTimestampDuration = Option(validTimestampDuration)
     )
   }
 }
@@ -28,7 +30,8 @@ case class AggregateParam(groupByKeys: Option[Seq[String]],
                           topK: Option[Int],
                           isArrayType: Option[Boolean],
                           shouldSortTopItems: Option[Boolean],
-                          numOfPartitions: Option[Int]) {
+                          numOfPartitions: Option[Int],
+                          validTimestampDuration: Option[Long]) {
 
   import AggregateParam._
 

Reply via email to