add FilterWalLogAggProcess.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d9af2b81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d9af2b81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d9af2b81 Branch: refs/heads/master Commit: d9af2b816e4745a7ca0d8a66964f124ac5779988 Parents: 5122e98 Author: DO YUNG YOON <[email protected]> Authored: Mon Aug 13 18:22:30 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Aug 13 18:22:30 2018 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/s2jobs/wal/WalLog.scala | 17 +++++ .../wal/process/FeatureIndexProcess.scala | 49 +------------ .../wal/process/FilterWalLogAggProcess.scala | 75 ++++++++++++++++++++ .../wal/process/params/FeatureIndexParam.scala | 7 +- .../process/params/FilterWalLogAggParam.scala | 7 ++ 5 files changed, 101 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala index f4a670b..e4c3467 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/WalLog.scala @@ -1,5 +1,6 @@ package org.apache.s2graph.s2jobs.wal +import com.google.common.hash.Hashing import org.apache.s2graph.core.JSONParser import org.apache.s2graph.s2jobs.wal.process.params.AggregateParam import org.apache.s2graph.s2jobs.wal.utils.BoundedPriorityQueue @@ -16,6 +17,22 @@ object WalLogAgg { new WalLogAgg(walLog.from, Seq(walLog), walLog.timestamp, walLog.timestamp) } + def toFeatureHash(dim: String, value: String): Long = { + Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong() + } + + def filter(walLogAgg: WalLogAgg, validFeatureHashKeys: Set[Long]) = { + val filtered = walLogAgg.logs.map { walLog => + val fields = Json.parse(walLog.props).as[JsObject].fields.filter { case (dim, jsValue) => + validFeatureHashKeys(toFeatureHash(dim, JSONParser.jsValueToString(jsValue))) + } + + walLog.copy(props = Json.toJson(fields).as[JsObject].toString) + } + + walLogAgg.copy(logs = filtered) + } + def merge(iter: Iterator[WalLogAgg], param: AggregateParam)(implicit ord: Ordering[WalLog]) = { val heap = new BoundedPriorityQueue[WalLog](param.heapSize) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala index 7d04334..073b0cf 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FeatureIndexProcess.scala @@ -1,16 +1,13 @@ package org.apache.s2graph.s2jobs.wal.process -import com.google.common.hash.Hashing import org.apache.s2graph.s2jobs.task.TaskConf import org.apache.s2graph.s2jobs.wal.process.params.FeatureIndexParam import org.apache.s2graph.s2jobs.wal.transformer._ import org.apache.s2graph.s2jobs.wal.udfs.WalLogUDF import org.apache.s2graph.s2jobs.wal.{DimVal, WalLog} -import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import play.api.libs.json.{JsObject, Json} import scala.collection.mutable @@ -62,31 +59,6 @@ object FeatureIndexProcess { .withColumn("count", col("dim_count._2")) .select("dim", "value", "count", "rank") } - - def toFeatureHash(dim: String, value: String): Long = { - Hashing.murmur3_128().hashBytes(s"$dim:$value".getBytes("UTF-8")).asLong() - } - - def collectDistinctFeatureHashes(ss: SparkSession, - filteredDict: DataFrame): Array[Long] = { - import ss.implicits._ - - val featureHashUDF = udf((dim: String, value: String) => toFeatureHash(dim, value)) - - filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value"))) - .select("featureHash") - .distinct().as[Long].collect() - } - - def filterTopKsPerDim(dict: DataFrame, - maxRankPerDim: Broadcast[Map[String, Int]], - defaultMaxRank: Int): DataFrame = { - val filterUDF = udf((dim: String, rank: Long) => { - rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank) - }) - - dict.filter(filterUDF(col("dim"), col("rank"))) - } } case class FeatureIndexProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { @@ -98,20 +70,11 @@ case class FeatureIndexProcess(taskConf: TaskConf) extends org.apache.s2graph.s2 val numOfPartitions = taskConf.options.get("numOfPartitions").map(_.toInt) val samplePointsPerPartitionHint = taskConf.options.get("samplePointsPerPartitionHint").map(_.toInt) val minUserCount = taskConf.options.get("minUserCount").map(_.toLong) - val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s => - val json = Json.parse(s).as[JsObject] - json.fieldSet.map { case (key, jsValue) => key -> jsValue.as[Int] }.toMap - } - val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt) - val dictPath = taskConf.options.get("dictPath") numOfPartitions.map { d => ss.sqlContext.setConf("spark.sql.shuffle.partitions", d.toString) } - // val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty)) - val param = FeatureIndexParam(minUserCount = minUserCount, countColumnName = Option(countColumnName), - numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint, - maxRankPerDim = maxRankPerDim, defaultMaxRank = defaultMaxRank, dictPath = dictPath + numOfPartitions = numOfPartitions, samplePointsPerPartitionHint = samplePointsPerPartitionHint ) val edges = taskConf.inputs.tail.foldLeft(inputMap(taskConf.inputs.head)) { case (prev, cur) => @@ -129,16 +92,6 @@ case class FeatureIndexProcess(taskConf: TaskConf) extends org.apache.s2graph.s2 val dict = buildDictionary(ss, rawFeatures, param, dimValColumnName) dict - //TODO: filter topKs per dim, then build valid dimValLs. - // then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal. - - // dictPath.foreach { path => dict.write.mode(SaveMode.Overwrite).parquet(path) } - // - // val filteredDict = filterTopKsPerDim(dict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue)) - // val distinctFeatureHashes = collectDistinctFeatureHashes(ss, filteredDict) - // val distinctFeatureHashesBCast = ss.sparkContext.broadcast(distinctFeatureHashes) - - // filteredDict } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala new file mode 100644 index 0000000..3c546a7 --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/FilterWalLogAggProcess.scala @@ -0,0 +1,75 @@ +package org.apache.s2graph.s2jobs.wal.process + +import org.apache.s2graph.s2jobs.task.TaskConf +import org.apache.s2graph.s2jobs.wal.WalLogAgg +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import play.api.libs.json.{JsObject, Json} + +object FilterWalLogAggProcess { + + def collectDistinctFeatureHashes(ss: SparkSession, + filteredDict: DataFrame): Array[Long] = { + import ss.implicits._ + + val featureHashUDF = udf((dim: String, value: String) => WalLogAgg.toFeatureHash(dim, value)) + + filteredDict.withColumn("featureHash", featureHashUDF(col("dim"), col("value"))) + .select("featureHash") + .distinct().as[Long].collect() + } + + def filterTopKsPerDim(dict: DataFrame, + maxRankPerDim: Broadcast[Map[String, Int]], + defaultMaxRank: Int): DataFrame = { + val filterUDF = udf((dim: String, rank: Long) => { + rank < maxRankPerDim.value.getOrElse(dim, defaultMaxRank) + }) + + dict.filter(filterUDF(col("dim"), col("rank"))) + } + + def filterWalLogAgg(walLogAgg: Dataset[WalLogAgg], + validFeatureHashKeysBCast: Broadcast[Array[Long]]) = { + walLogAgg.mapPartitions { iter => + val validFeatureHashKeys = validFeatureHashKeysBCast.value.toSet + + iter.map { walLogAgg => + WalLogAgg.filter(walLogAgg, validFeatureHashKeys) + } + } + } +} + +class FilterWalLogAggProcess(taskConf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(taskConf) { + + import FilterWalLogAggProcess._ + + /* + filter topKs per dim, then build valid dimValLs. + then broadcast valid dimValLs to original dataframe, and filter out not valid dimVal. + */ + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + val maxRankPerDim = taskConf.options.get("maxRankPerDim").map { s => + Json.parse(s).as[JsObject].fields.map { case (k, jsValue) => + k -> jsValue.as[Int] + }.toMap + } + val maxRankPerDimBCast = ss.sparkContext.broadcast(maxRankPerDim.getOrElse(Map.empty)) + + val defaultMaxRank = taskConf.options.get("defaultMaxRank").map(_.toInt) + + val featureDict = inputMap(taskConf.options("featureDict")) + val walLogAgg = inputMap(taskConf.options("walLogAgg")).as[WalLogAgg] + + + val filteredDict = filterTopKsPerDim(featureDict, maxRankPerDimBCast, defaultMaxRank.getOrElse(Int.MaxValue)) + val validFeatureHashKeys = collectDistinctFeatureHashes(ss, filteredDict) + val validFeatureHashKeysBCast = ss.sparkContext.broadcast(validFeatureHashKeys) + + filterWalLogAgg(walLogAgg, validFeatureHashKeysBCast).toDF() + } + + override def mandatoryOptions: Set[String] = Set("featureDict", "walLogAgg") +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala index ddf7037..4ede70b 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FeatureIndexParam.scala @@ -8,13 +8,8 @@ object FeatureIndexParam { case class FeatureIndexParam(minUserCount: Option[Long], countColumnName: Option[String], samplePointsPerPartitionHint: Option[Int], - numOfPartitions: Option[Int], - maxRankPerDim: Option[Map[String, Int]], - defaultMaxRank: Option[Int], - dictPath: Option[String]) { + numOfPartitions: Option[Int]) { import FeatureIndexParam._ val _countColumnName = countColumnName.getOrElse(defaultCountColumnName) val _minUserCount = minUserCount.getOrElse(defaultMinUserCount) - val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty) - val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d9af2b81/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala new file mode 100644 index 0000000..4b2209f --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/wal/process/params/FilterWalLogAggParam.scala @@ -0,0 +1,7 @@ +package org.apache.s2graph.s2jobs.wal.process.params + +class FilterWalLogAggParam(maxRankPerDim: Option[Map[String, Int]], + defaultMaxRank: Option[Int]) { + val _maxRankPerDim = maxRankPerDim.getOrElse(Map.empty) + val _defaultMaxRank = defaultMaxRank.getOrElse(Int.MaxValue) +}
