refactor WalLogAgg class. - separate edges and vertices.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/64889605 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/64889605 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/64889605 Branch: refs/heads/master Commit: 64889605d874af2dc71f1ca8b64a6a17ae386e4e Parents: 651ca2c Author: DO YUNG YOON <steams...@apache.org> Authored: Thu Oct 11 11:00:13 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu Oct 11 11:00:13 2018 +0900 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/s2jobs/Job.scala | 3 +- .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 72 ++++++++++++++------ .../wal/process/WalLogAggregateProcess.scala | 4 +- .../wal/process/params/AggregateParam.scala | 7 +- 4 files changed, 58 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala index 8f21bc2..6d9f509 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala @@ -63,8 +63,7 @@ class Job(ss:SparkSession, jobDesc:JobDescription) extends Serializable with Log val dfKeys = dfMap.keySet processes.filter{ p => - var existAllInput = true - p.conf.inputs.foreach { input => existAllInput = dfKeys(input) } + val existAllInput = p.conf.inputs.forall { input => dfKeys(input) } !dfKeys(p.conf.name) && existAllInput } .map { p => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index 9617ca1..4080045 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -1,7 +1,7 @@ package org.apache.s2graph.s2jobs.wal import com.google.common.hash.Hashing -import org.apache.s2graph.core.JSONParser +import org.apache.s2graph.core.{GraphUtil, JSONParser} import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam import org.apache.s2graph.s2jobs.wal.transformer.Transformer import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue @@ -12,10 +12,18 @@ import play.api.libs.json.{JsObject, Json} import scala.util.Try object WalLogAgg { - val outputColumns = Seq("from", "logs", "maxTs", "minTs") + val outputColumns = Seq("from", "vertices", "edges") + + def isEdge(walLog: WalLog): Boolean = { + walLog.elem == "edge" || walLog.elem == "e" + } def apply(walLog: WalLog): WalLogAgg = { - new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp) + val (vertices, edges) = + if (isEdge(walLog)) (Nil, Seq(walLog)) + else (Seq(walLog), Nil) + + new WalLogAgg(walLog.from, vertices, edges) } def toFeatureHash(dimVal: DimVal): Long = toFeatureHash(dimVal.dim, dimVal.value) @@ -24,29 +32,43 @@ object WalLogAgg { Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong() } - def merge(iter: Iterator[WalLogAgg], - param: AggregateParam)(implicit ord: Ordering[WalLog]) = { - val heap = new BoundedPriorityQueue[WalLog](param.heapSize) - var minTs = Long.MaxValue - var maxTs = Long.MinValue + private def addToHeap(iter: Seq[WalLog], + heap: BoundedPriorityQueue[WalLog], + validTimestampDuration: Option[Long]): Unit = { + val now = System.currentTimeMillis() - iter.foreach { walLogAgg => - minTs = Math.min(walLogAgg.minTs, minTs) - maxTs = Math.max(walLogAgg.maxTs, maxTs) + iter.foreach { walLog => + val ts = walLog.timestamp + val isValid = validTimestampDuration.map(d => now - ts < d).getOrElse(true) - walLogAgg.logs.foreach { walLog => + if (isValid) { heap += walLog } } - val topItems = if (param.sortTopItems) heap.toArray.sortBy(-_.timestamp) else heap.toArray + } + + def merge(iter: Iterator[WalLogAgg], + param: AggregateParam)(implicit ord: Ordering[WalLog]): Option[WalLogAgg] = { + val edgeHeap = new BoundedPriorityQueue[WalLog](param.heapSize) + val vertexHeap = new BoundedPriorityQueue[WalLog](param.heapSize) + + val validTimestampDuration = param.validTimestampDuration - WalLogAgg(topItems.head.from, topItems, maxTs, minTs) + iter.foreach { walLogAgg => + addToHeap(walLogAgg.vertices, vertexHeap, validTimestampDuration) + addToHeap(walLogAgg.edges, edgeHeap, validTimestampDuration) + } + + val topVertices = if (param.sortTopItems) vertexHeap.toArray.sortBy(-_.timestamp) else vertexHeap.toArray + val topEdges = if (param.sortTopItems) edgeHeap.toArray.sortBy(-_.timestamp) else edgeHeap.toArray + + topEdges.headOption.map(head => WalLogAgg(head.from, topVertices, topEdges)) } - def filterProps(walLogAgg: WalLogAgg, - transformers: Seq[Transformer], - validFeatureHashKeys: Set[Long]) = { - val filtered = walLogAgg.logs.map { walLog => + private def filterPropsInner(walLogs: Seq[WalLog], + transformers: Seq[Transformer], + validFeatureHashKeys: Set[Long]): Seq[WalLog] = { + walLogs.map { walLog => val fields = walLog.propsJson.fields.filter { case (propKey, propValue) => val filtered = transformers.flatMap { transformer => transformer.toDimValLs(walLog, propKey, JSONParser.jsValueToString(propValue)).filter(dimVal => validFeatureHashKeys(toFeatureHash(dimVal))) @@ -56,8 +78,15 @@ object WalLogAgg { walLog.copy(props = Json.toJson(fields.toMap).as[JsObject].toString) } + } + + def filterProps(walLogAgg: WalLogAgg, + transformers: Seq[Transformer], + validFeatureHashKeys: Set[Long]) = { + val filteredVertices = filterPropsInner(walLogAgg.vertices, transformers, validFeatureHashKeys) + val filteredEdges = filterPropsInner(walLogAgg.edges, transformers, validFeatureHashKeys) - walLogAgg.copy(logs = filtered) + walLogAgg.copy(vertices = filteredVertices, edges = filteredEdges) } } @@ -88,9 +117,8 @@ object DimVal { case class DimVal(dim: String, value: String) case class WalLogAgg(from: String, - logs: Seq[WalLog], - maxTs: Long, - minTs: Long) + vertices: Seq[WalLog], + edges: Seq[WalLog]) case class WalLog(timestamp: Long, operation: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala index ee4debd..aebb1cc 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/WalLogAggregateProcess.scala @@ -10,7 +10,7 @@ object WalLogAggregateProcess { dataset: Dataset[WalLogAgg], aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]) = { import ss.implicits._ - dataset.groupByKey(_.from).mapGroups { case (_, iter) => + dataset.groupByKey(_.from).flatMapGroups { case (_, iter) => WalLogAgg.merge(iter, aggregateParam) }.toDF(WalLogAgg.outputColumns: _*) } @@ -20,7 +20,7 @@ object WalLogAggregateProcess { aggregateParam: AggregateParam)(implicit ord: Ordering[WalLog]): DataFrame = { import ss.implicits._ - dataset.groupByKey(walLog => walLog.from).mapGroups { case (key, iter) => + dataset.groupByKey(walLog => walLog.from).flatMapGroups { case (key, iter) => WalLogAgg.merge(iter.map(WalLogAgg(_)), aggregateParam) }.toDF(WalLogAgg.outputColumns: _*) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/64889605/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala index 4cb1377..ecc12e3 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/AggregateParam.scala @@ -14,12 +14,14 @@ object AggregateParam { val arrayType = taskConf.options.get("arrayType").map(_.toBoolean).getOrElse(defaultIsArrayType) val sortTopItems = taskConf.options.get("sortTopItems").map(_.toBoolean).getOrElse(defaultShouldSortTopItems) val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt) + val validTimestampDuration = taskConf.options.get("validTimestampDuration").map(_.toLong).getOrElse(Long.MaxValue) new AggregateParam(groupByKeys = groupByKeys, topK = Option(maxNumOfEdges), isArrayType = Option(arrayType), shouldSortTopItems = Option(sortTopItems), - numOfPartitions = numOfPartitions + numOfPartitions = numOfPartitions, + validTimestampDuration = Option(validTimestampDuration) ) } } @@ -28,7 +30,8 @@ case class AggregateParam(groupByKeys: Option[Seq[String]], topK: Option[Int], isArrayType: Option[Boolean], shouldSortTopItems: Option[Boolean], - numOfPartitions: Option[Int]) { + numOfPartitions: Option[Int], + validTimestampDuration: Option[Long]) { import AggregateParam._