add FilterWalLogAggProcess.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d9af2b81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d9af2b81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d9af2b81

Branch: refs/heads/master
Commit: d9af2b816e4745a7ca0d8a66964f124ac5779988
Parents: 5122e98
Author: DO YUNG YOON <[email protected]>
Authored: Mon Aug 13 18:22:30 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Mon Aug 13 18:22:30 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/wal/WalLog.scala  | 17 +++++
 .../wal/process/FeatureIndexProcess.scala       | 49 +------------
 .../wal/process/FilterWalLogAggProcess.scala    | 75 ++++++++++++++++++++
 .../wal/process/params/FeatureIndexParam.scala  |  7 +-
 .../process/params/FilterWalLogAggParam.scala   |  7 ++
 5 files changed, 101 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
index f4a670b..e4c3467 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala
@@ -1,5 +1,6 @@
 package org.apache.s2graph.s2jobs.wal
 
+import com.google.common.hash.Hashing
 import org.apache.s2graph.core.JSONParser
 import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam
 import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue
@@ -16,6 +17,22 @@ object WalLogAgg {
     new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp)
   }
 
+  def toFeatureHash(dim: String, value: String): Long = {
+    Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
+  }
+
+  def filter(walLogAgg: WalLogAgg, validFeatureHashKeys: Set[Long]) = {
+    val filtered = walLogAgg.logs.map { walLog =>
+      val fields = Json.parse(walLog.props).as[JsObject].fields.filter { case 
(dim, jsValue) =>
+        validFeatureHashKeys(toFeatureHash(dim, 
JSONParser.jsValueToString(jsValue)))
+      }
+
+      walLog.copy(props = Json.toJson(fields).as[JsObject].toString)
+    }
+
+    walLogAgg.copy(logs = filtered)
+  }
+
   def merge(iter: Iterator[WalLogAgg],
             param: AggregateParam)(implicit ord: Ordering[WalLog]) = {
     val heap = new BoundedPriorityQueue[WalLog](param.heapSize)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
index 7d04334..073b0cf 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala
@@ -1,16 +1,13 @@
 package org.apache.s2graph.s2jobs.wal.process
 
-import com.google.common.hash.Hashing
 import org.apache.s2graph.s2jobs.task.TaskConf
 import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam
 import org.apache.s2graph.s2jobs.wal.transformer._
 import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF
 import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog}
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
-import play.api.libs.json.{JsObject, Json}
 
 import scala.collection.mutable
 
@@ -62,31 +59,6 @@ object FeatureIndexProcess {
       .withColumn("count", col("dim_count._2"))
       .select("dim", "value", "count", "rank")
   }
-
-  def toFeatureHash(dim: String, value: String): Long = {
-    Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong()
-  }
-
-  def collectDistinctFeatureHashes(ss: SparkSession,
-                                   filteredDict: DataFrame): Array[Long] = {
-    import ss.implicits._
-
-    val featureHashUDF = udf((dim: String, value: String) => 
toFeatureHash(dim, value))
-
-    filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), 
col("value")))
-      .select("featureHash")
-      .distinct().as[Long].collect()
-  }
-
-  def filterTopKsPerDim(dict: DataFrame,
-                        maxRankPerDim: Broadcast[Map[String, Int]],
-                        defaultMaxRank: Int): DataFrame = {
-    val filterUDF = udf((dim: String, rank: Long) => {
-      rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
-    })
-
-    dict.filter(filterUDF(col("dim"), col("rank")))
-  }
 }
 
 case class FeatureIndexProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
@@ -98,20 +70,11 @@ case class FeatureIndexProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2
     val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt)
     val samplePointsPerPartitionHint = 
taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt)
     val minUserCount = taskConf.options.get("minUserCount").map(_.toLong)
-    val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
-      val json = Json.parse(s).as[JsObject]
-      json.fieldSet.map { case (key, jsValue) => key -> jsValue.as[Int] }.toMap
-    }
-    val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
-    val dictPath = taskConf.options.get("dictPath")
 
     numOfPartitions.map { d => 
ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) }
 
-    //    val maxRankPerDimBCast = 
ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
-
     val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName 
= Option(countColumnName),
-      numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = 
samplePointsPerPartitionHint,
-      maxRankPerDim = maxRankPerDim, defaultMaxRank = defaultMaxRank, dictPath 
= dictPath
+      numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = 
samplePointsPerPartitionHint
     )
 
     val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) 
{ case (prev, cur) =>
@@ -129,16 +92,6 @@ case class FeatureIndexProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2
     val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName)
 
     dict
-    //TODO: filter topKs per dim, then build valid dimValLs.
-    // then broadcast valid dimValLs to original dataframe, and filter out not 
valid dimVal.
-
-    //    dictPath.foreach { path => 
dict.write.mode(SaveMode.Overwrite).parquet(path) }
-    //
-    //    val filteredDict = filterTopKsPerDim(dict, maxRankPerDimBCast, 
defaultMaxRank.getOrElse(Int.MaxValue))
-    //    val distinctFeatureHashes = collectDistinctFeatureHashes(ss, 
filteredDict)
-    //    val distinctFeatureHashesBCast = 
ss.sparkContext.broadcast(distinctFeatureHashes)
-
-    //    filteredDict
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
new file mode 100644
index 0000000..3c546a7
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala
@@ -0,0 +1,75 @@
+package org.apache.s2graph.s2jobs.wal.process
+
+import org.apache.s2graph.s2jobs.task.TaskConf
+import org.apache.s2graph.s2jobs.wal.WalLogAgg
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import play.api.libs.json.{JsObject, Json}
+
+object FilterWalLogAggProcess {
+
+  def collectDistinctFeatureHashes(ss: SparkSession,
+                                   filteredDict: DataFrame): Array[Long] = {
+    import ss.implicits._
+
+    val featureHashUDF = udf((dim: String, value: String) => 
WalLogAgg.toFeatureHash(dim, value))
+
+    filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), 
col("value")))
+      .select("featureHash")
+      .distinct().as[Long].collect()
+  }
+
+  def filterTopKsPerDim(dict: DataFrame,
+                        maxRankPerDim: Broadcast[Map[String, Int]],
+                        defaultMaxRank: Int): DataFrame = {
+    val filterUDF = udf((dim: String, rank: Long) => {
+      rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank)
+    })
+
+    dict.filter(filterUDF(col("dim"), col("rank")))
+  }
+
+  def filterWalLogAgg(walLogAgg: Dataset[WalLogAgg],
+                      validFeatureHashKeysBCast: Broadcast[Array[Long]]) = {
+    walLogAgg.mapPartitions { iter =>
+      val validFeatureHashKeys = validFeatureHashKeysBCast.value.toSet
+
+      iter.map { walLogAgg =>
+        WalLogAgg.filter(walLogAgg, validFeatureHashKeys)
+      }
+    }
+  }
+}
+
+class FilterWalLogAggProcess(taskConf: TaskConf) extends 
org.apache.s2graph.s2jobs.task.Process(taskConf) {
+
+  import FilterWalLogAggProcess._
+
+  /*
+    filter topKs per dim, then build valid dimValLs.
+    then broadcast valid dimValLs to original dataframe, and filter out not 
valid dimVal.
+   */
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): 
DataFrame = {
+    val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s =>
+      Json.parse(s).as[JsObject].fields.map { case (k, jsValue) =>
+        k -> jsValue.as[Int]
+      }.toMap
+    }
+    val maxRankPerDimBCast = 
ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty))
+
+    val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt)
+
+    val featureDict = inputMap(taskConf.options("featureDict"))
+    val walLogAgg = inputMap(taskConf.options("walLogAgg")).as[WalLogAgg]
+
+
+    val filteredDict = filterTopKsPerDim(featureDict, maxRankPerDimBCast, 
defaultMaxRank.getOrElse(Int.MaxValue))
+    val validFeatureHashKeys = collectDistinctFeatureHashes(ss, filteredDict)
+    val validFeatureHashKeysBCast = 
ss.sparkContext.broadcast(validFeatureHashKeys)
+
+    filterWalLogAgg(walLogAgg, validFeatureHashKeysBCast).toDF()
+  }
+
+  override def mandatoryOptions: Set[String] = Set("featureDict", "walLogAgg")
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
index ddf7037..4ede70b 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala
@@ -8,13 +8,8 @@ object FeatureIndexParam {
 case class FeatureIndexParam(minUserCount: Option[Long],
                              countColumnName: Option[String],
                              samplePointsPerPartitionHint: Option[Int],
-                             numOfPartitions: Option[Int],
-                             maxRankPerDim: Option[Map[String, Int]],
-                             defaultMaxRank: Option[Int],
-                             dictPath: Option[String]) {
+                             numOfPartitions: Option[Int]) {
   import FeatureIndexParam._
   val _countColumnName = countColumnName.getOrElse(defaultCountColumnName)
   val _minUserCount = minUserCount.getOrElse(defaultMinUserCount)
-  val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
-  val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
new file mode 100644
index 0000000..4b2209f
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.s2jobs.wal.process.params
+
+class FilterWalLogAggParam(maxRankPerDim: Option[Map[String, Int]],
+                           defaultMaxRank: Option[Int]) {
+  val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty)
+  val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue)
+}

Reply via email to