http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala b/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala deleted file mode 100644 index 71784fa..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/EraseDailyCounter.scala +++ /dev/null @@ -1,133 +0,0 @@ -package s2.counter - -import java.text.SimpleDateFormat - -import kafka.producer.KeyedMessage -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import play.api.libs.json.Json -import s2.config.{S2ConfigFactory, StreamingConfig} -import s2.counter.core.ExactCounter.ExactValueMap -import s2.counter.core._ -import s2.counter.core.v1.ExactStorageHBase -import s2.counter.core.v2.ExactStorageGraph -import s2.models.{Counter, CounterModel, DBModel} -import s2.spark.{SparkApp, WithKafka} - -import scala.collection.mutable -import scala.collection.mutable.{HashMap => MutableHashMap} - -/** - * Created by hsleep([email protected]) on 15. 7. 1.. - */ -object EraseDailyCounter extends SparkApp with WithKafka { - import scala.concurrent.ExecutionContext.Implicits.global - - lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) - - def valueToEtlItem(policy: Counter, key: ExactKeyTrait, values: ExactValueMap): Seq[CounterEtlItem] = { - if (values.nonEmpty) { - for { - (eq, value) <- filter(values.toList) - } yield { - CounterEtlItem(eq.tq.ts, policy.service, policy.action, key.itemKey, Json.toJson(eq.dimKeyValues), Json.toJson(Map("value" -> -value))) - } - } else { - Nil - } - } - - def filter(values: List[(ExactQualifier, Long)]): List[(ExactQualifier, Long)] = { - val sorted = values.sortBy(_._1.dimKeyValues.size).reverse - val (eq, value) = sorted.head - val dimKeys = eq.dimKeyValues.toSeq - val flat = { - for { - i <- 0 to dimKeys.length - comb <- dimKeys.combinations(i) - } yield { - ExactQualifier(eq.tq, comb.toMap) -> value - } - }.toMap - -// println("flat >>>", flat) - - val valuesMap = values.toMap - val remain = (valuesMap ++ flat.map { case (k, v) => - k -> (valuesMap(k) - v) - }).filter(_._2 > 0).toList - -// println("remain >>>", remain) - - if (remain.isEmpty) { - List((eq, value)) - } else { - (eq, value) :: filter(remain) - } - } - - def produce(policy: Counter, exactRdd: RDD[(ExactKeyTrait, ExactValueMap)]): Unit = { - exactRdd.mapPartitions { part => - for { - (key, values) <- part - item <- valueToEtlItem(policy, key, values) - } yield { - item - } - }.foreachPartition { part => - val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]] - part.foreach { item => - val k = getPartKey(item.item, 20) - val values = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem]) - values += item - m.update(k, values) - } - m.foreach { case (k, v) => - v.map(_.toKafkaMessage).grouped(1000).foreach { grouped => -// println(grouped) - producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n"))) - } - } - } - } - - def rddToExactRdd(policy: Counter, date: String, rdd: RDD[String]): RDD[(ExactKeyTrait, ExactValueMap)] = { - val dateFormat = new SimpleDateFormat("yyyy-MM-dd") - val fromTs = dateFormat.parse(date).getTime - val toTs = fromTs + 23 * 60 * 60 * 1000 - - rdd.mapPartitions { part => - val exactCounter = policy.version match { - case VERSION_1 => new ExactCounter(S2ConfigFactory.config, new ExactStorageHBase(S2ConfigFactory.config)) - case VERSION_2 => new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) - } - - for { - line <- part - FetchedCounts(exactKey, qualifierWithCountMap) <- exactCounter.getCount(policy, line, Array(TimedQualifier.IntervalUnit.DAILY), fromTs, toTs) - } yield { - (exactKey, qualifierWithCountMap) - } - } - } - - lazy val className = getClass.getName.stripSuffix("$") - - override def run(): Unit = { - validateArgument("service", "action", "date", "file", "op") - DBModel.initialize(S2ConfigFactory.config) - - val (service, action, date, file, op) = (args(0), args(1), args(2), args(3), args(4)) - val conf = sparkConf(s"$className: $service.$action") - - val ctx = new SparkContext(conf) - - val rdd = ctx.textFile(file, 20) - - val counterModel = new CounterModel(S2ConfigFactory.config) - - val policy = counterModel.findByServiceAction(service, action).get - val exactRdd = rddToExactRdd(policy, date, rdd) - produce(policy, exactRdd) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala deleted file mode 100644 index cefce65..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlFunctions.scala +++ /dev/null @@ -1,89 +0,0 @@ -package s2.counter.core - -import com.kakao.s2graph.core.{Edge, Graph, GraphUtil} -import org.apache.spark.Logging -import play.api.libs.json._ -import s2.config.{S2ConfigFactory, StreamingConfig} -import s2.models.CounterModel - -import scala.collection.mutable.{HashMap => MutableHashMap} - -/** - * Created by hsleep([email protected]) on 15. 3. 17.. - */ -object CounterEtlFunctions extends Logging { - lazy val filterOps = Seq("insert", "insertBulk", "update", "increment").map(op => GraphUtil.operations(op)) - lazy val preFetchSize = StreamingConfig.PROFILE_PREFETCH_SIZE - lazy val config = S2ConfigFactory.config - lazy val counterModel = new CounterModel(config) - - def logToEdge(line: String): Option[Edge] = { - for { - elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge] - edge <- Some(elem.asInstanceOf[Edge]).filter { x => - filterOps.contains(x.op) - } - } yield { - edge - } - } - - def parseEdgeFormat(line: String): Option[CounterEtlItem] = { - /** - * 1427082276804 insert edge 19073318 52453027_93524145648511699 story_user_ch_doc_view {"doc_type" : "l", "channel_subscribing" : "y", "view_from" : "feed"} - */ - for { - elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge] - edge <- Some(elem.asInstanceOf[Edge]).filter { x => - filterOps.contains(x.op) - } - } yield { - val label = edge.label - val labelName = label.label - val tgtService = label.tgtColumn.service.serviceName - val tgtId = edge.tgtVertex.innerId.toString() - val srcId = edge.srcVertex.innerId.toString() - - // make empty property if no exist edge property - val dimension = Json.parse(Some(GraphUtil.split(line)).filter(_.length >= 7).map(_(6)).getOrElse("{}")) - val bucketKeys = Seq("_from") - val bucketKeyValues = { - for { - variable <- bucketKeys - } yield { - val jsValue = variable match { - case "_from" => JsString(srcId) - case s => dimension \ s - } - s"[[$variable]]" -> jsValue - } - } - val property = Json.toJson(bucketKeyValues :+ ("value" -> JsString("1")) toMap) -// val property = Json.toJson(Map("_from" -> srcId, "_to" -> tgtId, "value" -> "1")) - - CounterEtlItem(edge.ts, tgtService, labelName, tgtId, dimension, property) - } - } - - def parseEdgeFormat(lines: List[String]): List[CounterEtlItem] = { - for { - line <- lines - item <- parseEdgeFormat(line) - } yield { - item - } - } - - def checkPolicyAndMergeDimension(service: String, action: String, items: List[CounterEtlItem]): List[CounterEtlItem] = { - counterModel.findByServiceAction(service, action).map { policy => - if (policy.useProfile) { - policy.bucketImpId match { - case Some(_) => DimensionProps.mergeDimension(policy, items) - case None => Nil - } - } else { - items - } - }.getOrElse(Nil) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala b/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala deleted file mode 100644 index 1b0f3cd..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/core/CounterEtlItem.scala +++ /dev/null @@ -1,43 +0,0 @@ -package s2.counter.core - -import org.slf4j.LoggerFactory -import play.api.libs.json._ -import s2.util.UnitConverter - -import scala.util.{Failure, Success, Try} - -/** -* Created by hsleep([email protected]) on 15. 10. 6.. -*/ -case class CounterEtlItem(ts: Long, service: String, action: String, item: String, dimension: JsValue, property: JsValue, useProfile: Boolean = false) { - def toKafkaMessage: String = { - s"$ts\t$service\t$action\t$item\t${dimension.toString()}\t${property.toString()}" - } - - lazy val value = { - property \ "value" match { - case JsNumber(n) => n.longValue() - case JsString(s) => s.toLong - case _: JsUndefined => 1L - case _ => throw new Exception("wrong type") - } - } -} - -object CounterEtlItem { - val log = LoggerFactory.getLogger(this.getClass) - - def apply(line: String): Option[CounterEtlItem] = { - Try { - val Array(ts, service, action, item, dimension, property) = line.split('\t') - CounterEtlItem(UnitConverter.toMillis(ts.toLong), service, action, item, Json.parse(dimension), Json.parse(property)) - } match { - case Success(item) => - Some(item) - case Failure(ex) => - log.error(">>> failed") - log.error(s"${ex.toString}: $line") - None - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala deleted file mode 100644 index a36b55f..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/core/CounterFunctions.scala +++ /dev/null @@ -1,446 +0,0 @@ -package s2.counter.core - -import com.kakao.s2graph.core.GraphUtil -import kafka.producer.KeyedMessage -import org.apache.spark.rdd.RDD -import org.apache.spark.{Accumulable, Logging} -import play.api.libs.json.{JsString, JsNumber, JsValue, Json} -import s2.config.{S2ConfigFactory, StreamingConfig} -import s2.counter.TrxLog -import s2.counter.core.ExactCounter.ExactValueMap -import s2.counter.core.RankingCounter.RankingValueMap -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core.v2.{ExactStorageGraph, RankingStorageGraph} -import s2.models.{Counter, DBModel, DefaultCounterModel} -import s2.spark.WithKafka - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.language.postfixOps -import scala.util.Try - -/** - * Created by hsleep([email protected]) on 15. 10. 6.. - */ -object CounterFunctions extends Logging with WithKafka { - import scala.concurrent.ExecutionContext.Implicits.global - - private val K_MAX = 500 - - val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) - val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config)) - - lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) - - type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, Long)] - - val initialize = { - logInfo("initialize CounterFunctions") - DBModel.initialize(S2ConfigFactory.config) - true - } - - def getCountValue(policy: Counter, item: CounterEtlItem): ExactValueMap = { - for { - dimKeys <- policy.dimensionList - dimValues <- getDimensionValues(item.dimension, dimKeys).toSeq - eq <- ExactQualifier.getQualifiers(policy.intervals.map(IntervalUnit.withName), item.ts, dimKeys.zip(dimValues).toMap) - } yield { - eq -> item.value - } - }.toMap - - def getDimensionValues(dimension: JsValue, keys: Array[String]): Option[Array[String]] = { - Try { - for { - k <- keys - jsValue = dimension \ k - } yield { - jsValue match { - case JsNumber(n) => n.toString() - case JsString(s) => s - case _ => throw new Exception() - } - } - }.toOption - } - - def exactMapper(item: CounterEtlItem): Option[(ExactKeyTrait, ExactValueMap)] = { - DefaultCounterModel.findByServiceAction(item.service, item.action).map { policy => - (ExactKey(policy, item.item, checkItemType = true), getCountValue(policy, item)) - } - } - - def rankingMapper(row: ItemRankingRow): Seq[(RankingKey, RankingValueMap)] = { - // normal ranking - for { - (eq, rv) <- row.value - } yield { - (RankingKey(row.key.policyId, row.key.version, eq), Map(row.key.itemKey -> rv)) - } - }.toSeq - - def logToRankValue(log: TrxLog): Option[(ExactKeyTrait, Map[ExactQualifier, RankingValue])] = { - DefaultCounterModel.findById(log.policyId).map { policy => - val key = ExactKey(policy, log.item, checkItemType = false) - val value = { - for { - result <- log.results - } yield { - ExactQualifier(TimedQualifier(result.interval, result.ts), result.dimension) -> RankingValue(result.result, result.value) - } - }.toMap - key -> value - } - } - - def reduceValue[T, U](op: (U, U) => U, default: U)(m1: Map[T, U], m2: Map[T, U]): Map[T, U] = { - m1 ++ m2.map { case (k, v) => - k -> op(m1.getOrElse(k, default), v) - } - } - - def makeExactRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[(ExactKeyTrait, ExactValueMap)] = { - rdd.mapPartitions { part => - assert(initialize) - for { - (k, v) <- part - line <- GraphUtil.parseString(v) - item <- CounterEtlItem(line).toSeq - ev <- exactMapper(item).toSeq - } yield { - ev - } - }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), numPartitions) - } - - def makeRankingRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { - val logRdd = makeTrxLogRdd(rdd, numPartitions) - makeRankingRddFromTrxLog(logRdd, numPartitions) - } - - def makeRankingRddFromTrxLog(rdd: RDD[TrxLog], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { - val itemRankingRdd = makeItemRankingRdd(rdd, numPartitions).cache() - try { - rankingCount(itemRankingRdd, numPartitions) union - rateRankingCount(itemRankingRdd, numPartitions) union - trendRankingCount(itemRankingRdd, numPartitions) coalesce numPartitions - } finally { - itemRankingRdd.unpersist(false) - } - } - - def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[TrxLog] = { - rdd.mapPartitions { part => - assert(initialize) - for { - (k, v) <- part - line <- GraphUtil.parseString(v) - trxLog = Json.parse(line).as[TrxLog] if trxLog.success - } yield { - trxLog - } - } - } - - def rankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { - rdd.mapPartitions { part => - for { - row <- part - rv <- rankingMapper(row) - } yield { - rv - } - }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions) - } - - case class ItemRankingRow(key: ExactKeyTrait, value: Map[ExactQualifier, RankingValue]) - - def makeItemRankingRdd(rdd: RDD[TrxLog], numPartitions: Int): RDD[ItemRankingRow] = { - rdd.mapPartitions { part => - for { - log <- part - rv <- logToRankValue(log) - } yield { - rv - } - }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions).mapPartitions { part => - for { - (key, value) <- part - } yield { - ItemRankingRow(key, value) - } - } - } - - def mapTrendRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, Map[ExactQualifier, RateRankingValue])] = { - for { - row <- rows - trendPolicy <- DefaultCounterModel.findByTrendActionId(row.key.policyId) - } yield { - val key = ExactKey(trendPolicy, row.key.itemKey, checkItemType = false) - val value = row.value.filter { case (eq, rv) => - // eq filter by rate policy dimension - trendPolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys } - }.map { case (eq, rv) => - eq -> RateRankingValue(rv.score, -1) - } - (key, value) - } - } - - def mapRateRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, Map[ExactQualifier, RateRankingValue])] = { - val actionPart = { - for { - row <- rows - ratePolicy <- DefaultCounterModel.findByRateActionId(row.key.policyId) - } yield { - val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false) - val value = row.value.filter { case (eq, rv) => - // eq filter by rate policy dimension - ratePolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys } - }.map { case (eq, rv) => - eq -> RateRankingValue(rv.score, -1) - } - (key, value) - } - } - - val basePart = { - for { - row <- rows - ratePolicy <- DefaultCounterModel.findByRateBaseId(row.key.policyId) - } yield { - val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false) - val value = row.value.filter { case (eq, rv) => - // eq filter by rate policy dimension - ratePolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys } - }.map { case (eq, rv) => - eq -> RateRankingValue(-1, rv.score) - } - (key, value) - } - } - - actionPart ++ basePart - } - - def trendRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { - rdd.mapPartitions { part => - mapTrendRankingValue(part.toSeq) toIterator - }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(_, _), numPartitions).mapPartitions { part => - val missingByPolicy = { - for { - (key, value) <- part.toSeq - trendPolicy <- DefaultCounterModel.findById(key.policyId).toSeq - actionId <- trendPolicy.rateActionId.toSeq - actionPolicy <- DefaultCounterModel.findById(actionId).toSeq - } yield { - // filter total eq - val missingQualifiersWithRRV = value.filterKeys { eq => eq.tq.q != IntervalUnit.TOTAL } - (actionPolicy, key, missingQualifiersWithRRV) - } - }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3))) - - val filled = { - for { - (policy, missing) <- missingByPolicy.toSeq - keyWithPast = exactCounter.getPastCounts(policy, missing.map { case (k, v) => k.itemKey -> v.keys.toSeq }) - (key, current) <- missing - } yield { - val past = keyWithPast.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long]) - val base = past.mapValues(l => RateRankingValue(-1, l)) -// log.warn(s"trend: $policy $key -> $current $base") - key -> reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, base) - } - } - -// filled.foreach(println) - - { - // filter by rate threshold - for { - (key, value) <- filled - ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq - (eq, rrv) <- value if rrv.baseScore >= ratePolicy.rateThreshold.getOrElse(Int.MinValue) - } yield { - (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> rrv.rankingValue)) - } - } toIterator - }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions) - } - - def rateRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { - rdd.mapPartitions { part => - mapRateRankingValue(part.toSeq) toIterator - }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(_, _), numPartitions).mapPartitions { part => - val seq = part.toSeq -// seq.foreach(x => println(s"item ranking row>> $x")) - - // find and evaluate action value is -1 - val actionMissingByPolicy = { - for { - (key, value) <- seq if value.exists { case (eq, rrv) => rrv.actionScore == -1 } - ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq - actionId <- ratePolicy.rateActionId.toSeq - actionPolicy <- DefaultCounterModel.findById(actionId) - } yield { - (actionPolicy, key, value.filter { case (eq, rrv) => rrv.actionScore == -1 }) - } - }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3))) - - val actionFilled = { - for { - (actionPolicy, actionMissing) <- actionMissingByPolicy.toSeq - keyWithRelated = exactCounter.getRelatedCounts(actionPolicy, actionMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq }) - (key, current) <- actionMissing - } yield { - val related = keyWithRelated.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long]) - val found = related.mapValues(l => RateRankingValue(l, -1)) - val filled = reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, found) -// log.warn(s"action: $key -> $found $filled") - key -> filled - } - } - -// actionFilled.foreach(x => println(s"action filled>> $x")) - - // find and evaluate base value is -1 - val baseMissingByPolicy = { - for { - (key, value) <- seq if value.exists { case (eq, rrv) => rrv.baseScore == -1 } - ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq - baseId <- ratePolicy.rateBaseId.toSeq - basePolicy <- DefaultCounterModel.findById(baseId) - } yield { - (basePolicy, key, value.filter { case (eq, rrv) => rrv.baseScore == -1 }) - } - }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3))) - - val baseFilled = { - for { - (basePolicy, baseMissing) <- baseMissingByPolicy.toSeq - keyWithRelated = exactCounter.getRelatedCounts(basePolicy, baseMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq }) - (key, current) <- baseMissing - } yield { - val related = keyWithRelated.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long]) - val found = related.mapValues(l => RateRankingValue(-1, l)) - val filled = reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, found) -// log.warn(s"base: $basePolicy $key -> $found $filled") - key -> filled - } - } - -// baseFilled.foreach(x => println(s"base filled>> $x")) - - val alreadyFilled = { - for { - (key, value) <- seq if value.exists { case (eq, rrv) => rrv.actionScore != -1 && rrv.baseScore != -1 } - } yield { - key -> value.filter { case (eq, rrv) => rrv.actionScore != -1 && rrv.baseScore != -1 } - } - } - - val rtn = { - // filter by rate threshold - for { - (key, value) <- actionFilled ++ baseFilled ++ alreadyFilled - ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq - (eq, rrv) <- value if rrv.baseScore >= ratePolicy.rateThreshold.getOrElse(Int.MinValue) - } yield { - (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> rrv.rankingValue)) - } - } - rtn.toIterator - }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions) - } - - def insertBlobValue(keys: Seq[BlobExactKey], acc: HashMapAccumulable): Unit = { - val keyByPolicy = { - for { - key <- keys - policy <- DefaultCounterModel.findById(key.policyId) - } yield { - (policy, key) - } - }.groupBy(_._1).mapValues(values => values.map(_._2)) - - for { - (policy, allKeys) <- keyByPolicy - keys <- allKeys.grouped(10) - success <- exactCounter.insertBlobValue(policy, keys) - } yield { - success match { - case true => acc += ("BLOB", 1) - case false => acc += ("BLOBFailed", 1) - } - } - } - - def updateExactCounter(counts: Seq[(ExactKeyTrait, ExactValueMap)], acc: HashMapAccumulable): Seq[TrxLog] = { - val countsByPolicy = { - for { - (key, count) <- counts - policy <- DefaultCounterModel.findById(key.policyId) - } yield { - (policy, (key, count)) - } - }.groupBy { case (policy, _) => policy }.mapValues(values => values.map(_._2)) - - for { - (policy, allCounts) <- countsByPolicy - counts <- allCounts.grouped(10) - trxLog <- exactCounter.updateCount(policy, counts) - } yield { - trxLog.success match { - case true => acc += (s"ExactV${policy.version}", 1) - case false => acc += (s"ExactFailedV${policy.version}", 1) - } - trxLog - } - }.toSeq - - def exactCountFromEtl(rdd: RDD[CounterEtlItem], numPartitions: Int): RDD[(ExactKeyTrait, ExactValueMap)] = { - rdd.mapPartitions { part => - for { - item <- part - ev <- exactMapper(item).toSeq - } yield { - ev - } - }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), numPartitions) - } - - def updateRankingCounter(values: TraversableOnce[(RankingKey, RankingValueMap)], acc: HashMapAccumulable): Unit = { - assert(initialize) - val valuesByPolicy = { - for { - (key, value) <- values.toSeq - policy <- DefaultCounterModel.findById(key.policyId) - if policy.useRank && rankingCounter.ready(policy) // update only rank counter enabled and ready - } yield { - (policy, (key, value)) - } - }.groupBy { case (policy, _) => policy }.mapValues(values => values.map(_._2)) - - for { - (policy, allValues) <- valuesByPolicy - groupedValues <- allValues.grouped(10) - } { - rankingCounter.update(groupedValues, K_MAX) - acc += (s"RankingV${policy.version}", groupedValues.length) - } - } - - def produceTrxLog(trxLogs: TraversableOnce[TrxLog]): Unit = { - for { - trxLog <- trxLogs - } { - val topic = trxLog.success match { - case true => StreamingConfig.KAFKA_TOPIC_COUNTER_TRX - case false => StreamingConfig.KAFKA_TOPIC_COUNTER_FAIL - } - val msg = new KeyedMessage[String, String](topic, s"${trxLog.policyId}${trxLog.item}", Json.toJson(trxLog).toString()) - producer.send(msg) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala b/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala deleted file mode 100644 index 98bc750..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/core/DimensionProps.scala +++ /dev/null @@ -1,154 +0,0 @@ -package s2.counter.core - -import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service} -import org.apache.commons.httpclient.HttpStatus -import org.slf4j.LoggerFactory -import play.api.libs.json._ -import s2.config.StreamingConfig -import s2.models.Counter -import s2.util.{CollectionCache, CollectionCacheConfig, RetryAsync} - -import scala.annotation.tailrec -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import scala.util.Try - -/** - * Created by hsleep([email protected]) on 2015. 10. 6.. - */ -object DimensionProps { - // using play-ws without play app - private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() - private val client = new play.api.libs.ws.ning.NingWSClient(builder.build) - private val log = LoggerFactory.getLogger(this.getClass) - - private val retryCnt = 3 - - val cacheConfig = CollectionCacheConfig(StreamingConfig.PROFILE_CACHE_MAX_SIZE, - StreamingConfig.PROFILE_CACHE_TTL_SECONDS, - negativeCache = true, - 3600 // negative ttl 1 hour - ) - val cache: CollectionCache[Option[JsValue]] = new CollectionCache[Option[JsValue]](cacheConfig) - - @tailrec - private[counter] def makeRequestBody(requestBody: String, keyValues: List[(String, String)]): String = { - keyValues match { - case head :: tail => - makeRequestBody(requestBody.replace(head._1, head._2), tail) - case Nil => requestBody - } - } - - private[counter] def query(bucket: Bucket, item: CounterEtlItem): Future[Option[JsValue]] = { - val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] fields) - .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") } - .map { case (key, jsValue) => - val replacement = jsValue match { - case JsString(s) => s - case value => value.toString() - } - key -> replacement - }.toList - - val cacheKey = s"${bucket.impressionId}=" + keyValues.flatMap(x => Seq(x._1, x._2)).mkString("_") - - cache.withCacheAsync(cacheKey) { - val retryFuture = RetryAsync(retryCnt, withSleep = false) { - val future = bucket.httpVerb.toUpperCase match { - case "GET" => - client.url(bucket.apiPath).get() - case "POST" => - val newBody = makeRequestBody(bucket.requestBody, keyValues) - client.url(bucket.apiPath).post(Json.parse(newBody)) - } - - future.map { resp => - resp.status match { - case HttpStatus.SC_OK => - val json = Json.parse(resp.body) - for { - results <- (json \ "results").asOpt[Seq[JsValue]] - result <- results.headOption - props <- (result \ "props").asOpt[JsValue] - } yield { - props - } - case _ => - log.error(s"${resp.body}(${resp.status}}) item: $item") - None - } - } - } - - // if fail to retry - retryFuture onFailure { case t => log.error(s"${t.getMessage} item: $item") } - - retryFuture - } - } - - private[counter] def query(service: Service, experiment: Experiment, item: CounterEtlItem): Future[Option[JsValue]] = { - val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] fields) - .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") }.toMap - - val cacheKey = s"${experiment.name}=" + keyValues.flatMap(x => Seq(x._1, x._2)).mkString("_") - - cache.withCacheAsync(cacheKey) { - val retryFuture = RetryAsync(retryCnt, withSleep = false) { - val url = s"${StreamingConfig.GRAPH_URL}/graphs/experiment/${service.accessToken}/${experiment.name}/0" - val future = client.url(url).post(Json.toJson(keyValues)) - - future.map { resp => - resp.status match { - case HttpStatus.SC_OK => - val json = Json.parse(resp.body) - for { - results <- (json \ "results").asOpt[Seq[JsValue]] - result <- results.headOption - props <- (result \ "props").asOpt[JsValue] - } yield { - props - } - case _ => - log.error(s"${resp.body}(${resp.status}}) item: $item") - None - } - } - } - - // if fail to retry - retryFuture onFailure { case t => log.error(s"${t.getMessage} item: $item") } - - retryFuture - } - } - - def mergeDimension(policy: Counter, items: List[CounterEtlItem]): List[CounterEtlItem] = { - for { - impId <- policy.bucketImpId - bucket <- Bucket.findByImpressionId(impId) - experiment <- Experiment.findById(bucket.experimentId) - service <- Try { Service.findById(experiment.serviceId) }.toOption - } yield { - val futures = { - for { - item <- items - } yield { - query(service, experiment, item).map { - case Some(jsValue) => - val newDimension = item.dimension.as[JsObject] ++ jsValue.as[JsObject] - item.copy(dimension = newDimension) - case None => item - } - } - } - Await.result(Future.sequence(futures), 10 seconds) - } - }.getOrElse(items) - - def getCacheStatsString: String = { - cache.getStatsString - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala b/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala deleted file mode 100644 index 03c42b5..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/stream/EtlStreaming.scala +++ /dev/null @@ -1,116 +0,0 @@ -package s2.counter.stream - -import com.kakao.s2graph.core.{Graph, GraphUtil} -import kafka.producer.KeyedMessage -import kafka.serializer.StringDecoder -import org.apache.spark.streaming.Durations._ -import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions -import org.apache.spark.streaming.kafka.StreamHelper -import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig} -import s2.counter.core.{CounterEtlFunctions, CounterEtlItem, DimensionProps} -import s2.models.{CounterModel, DBModel} -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.concurrent.ExecutionContext - -/** - * Created by hsleep([email protected]) on 15. 10. 6.. - */ -object EtlStreaming extends SparkApp with WithKafka { - lazy val config = S2ConfigFactory.config - lazy val s2Config = new S2CounterConfig(config) - lazy val counterModel = new CounterModel(config) - lazy val className = getClass.getName.stripSuffix("$") - lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) - - implicit val graphEx = ExecutionContext.Implicits.global - - val initialize = { - println("streaming initialize") -// Graph(config) - DBModel.initialize(config) - true - } - - val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_ETL) - val strInputTopics = inputTopics.mkString(",") - val groupId = buildKafkaGroupId(strInputTopics, "etl_to_counter") - val kafkaParam = Map( - "group.id" -> groupId, - "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, - "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, - "zookeeper.connection.timeout.ms" -> "10000" - ) - val streamHelper = StreamHelper(kafkaParam) - - override def run(): Unit = { - validateArgument("interval") - val (intervalInSec) = seconds(args(0).toLong) - - val conf = sparkConf(s"$strInputTopics: $className") - val ssc = streamingContext(conf, intervalInSec) - val sc = ssc.sparkContext - - val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - /** - * read message from etl topic and join user profile from graph and then produce whole message to counter topic - */ - val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics) - - // etl logic - stream.foreachRDD { (rdd, ts) => - rdd.foreachPartitionWithOffsetRange { case (osr, part) => - assert(initialize) - - // convert to edge format - val items = { - for { - (k, v) <- part - line <- GraphUtil.parseString(v) - item <- CounterEtlFunctions.parseEdgeFormat(line) - } yield { - acc += ("Edges", 1) - item - } - } - - // join user profile - val joinItems = items.toList.groupBy { e => - (e.service, e.action) - }.flatMap { case ((service, action), v) => - CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v) - } - - // group by kafka partition key and send to kafka - val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]] - joinItems.foreach { item => - if (item.useProfile) { - acc += ("ETL", 1) - } - val k = getPartKey(item.item, 20) - val values: mutable.MutableList[CounterEtlItem] = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem]) - values += item - m.update(k, values) - } - m.foreach { case (k, v) => - v.map(_.toKafkaMessage).grouped(1000).foreach { grouped => - acc += ("Produce", grouped.size) - producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n"))) - } - } - - streamHelper.commitConsumerOffset(osr) - } - - if (ts.milliseconds / 1000 % 60 == 0) { - log.warn(DimensionProps.getCacheStatsString) - } - } - - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala b/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala deleted file mode 100644 index 2b8ba21..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/stream/ExactCounterStreaming.scala +++ /dev/null @@ -1,72 +0,0 @@ -package s2.counter.stream - -import kafka.serializer.StringDecoder -import org.apache.spark.streaming.Durations._ -import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions -import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper} -import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig} -import s2.counter.core.CounterFunctions -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.language.postfixOps - -/** - * Streaming job for counter topic - * Created by hsleep([email protected]) on 15. 1. 15.. - */ -object ExactCounterStreaming extends SparkApp with WithKafka { - lazy val config = S2ConfigFactory.config - lazy val s2Config = new S2CounterConfig(config) - lazy val className = getClass.getName.stripSuffix("$") - - lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) - - val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER) - val strInputTopics = inputTopics.mkString(",") - val groupId = buildKafkaGroupId(strInputTopics, "counter_v2") - val kafkaParam = Map( -// "auto.offset.reset" -> "smallest", - "group.id" -> groupId, - "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, - "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, - "zookeeper.connection.timeout.ms" -> "10000" - ) - val streamHelper = StreamHelper(kafkaParam) - - override def run() = { - validateArgument("interval", "clear") - val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean) - - if (clear) { - streamHelper.kafkaHelper.consumerGroupCleanup() - } - - val conf = sparkConf(s"$strInputTopics: $className") - val ssc = streamingContext(conf, intervalInSec) - val sc = ssc.sparkContext - - implicit val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - // make stream - val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics) - stream.foreachRDD { (rdd, ts) => - val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - - val exactRDD = CounterFunctions.makeExactRdd(rdd, offsets.length) - - // for at-least once semantic - exactRDD.foreachPartitionWithIndex { (i, part) => - // update exact counter - val trxLogs = CounterFunctions.updateExactCounter(part.toSeq, acc) - CounterFunctions.produceTrxLog(trxLogs) - - // commit offset range - streamHelper.commitConsumerOffset(offsets(i)) - } - } - - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala b/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala deleted file mode 100644 index 39654d3..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/stream/GraphToETLStreaming.scala +++ /dev/null @@ -1,83 +0,0 @@ -package s2.counter.stream - -import com.kakao.s2graph.core.GraphUtil -import kafka.producer.KeyedMessage -import kafka.serializer.StringDecoder -import org.apache.spark.streaming.Durations._ -import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions -import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig} -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable -import scala.collection.mutable.{HashMap => MutableHashMap} - -/** - * can be @deprecated - * Created by hsleep([email protected]) on 15. 3. 16.. - */ -object GraphToETLStreaming extends SparkApp with WithKafka { - lazy val config = S2ConfigFactory.config - lazy val s2Config = new S2CounterConfig(config) - lazy val className = getClass.getName.stripSuffix("$") - lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) - - override def run(): Unit = { - validateArgument("interval", "topic") - val (intervalInSec, topic) = (seconds(args(0).toLong), args(1)) - - val groupId = buildKafkaGroupId(topic, "graph_to_etl") - val kafkaParam = Map( -// "auto.offset.reset" -> "smallest", - "group.id" -> groupId, - "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, - "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, - "zookeeper.connection.timeout.ms" -> "10000" - ) - - val conf = sparkConf(s"$topic: $className") - val ssc = streamingContext(conf, intervalInSec) - val sc = ssc.sparkContext - - val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - /** - * consume graphIn topic and produce messages to etl topic - * two purpose - * 1. partition by target vertex id - * 2. expand kafka partition count - */ - val stream = getStreamHelper(kafkaParam).createStream[String, String, StringDecoder, StringDecoder](ssc, topic.split(',').toSet) - stream.foreachRDD { rdd => - rdd.foreachPartitionWithOffsetRange { case (osr, part) => - val m = MutableHashMap.empty[Int, mutable.MutableList[String]] - for { - (k, v) <- part - line <- GraphUtil.parseString(v) - } { - try { - val sp = GraphUtil.split(line) - // get partition key by target vertex id - val partKey = getPartKey(sp(4), 20) - val values = m.getOrElse(partKey, mutable.MutableList.empty[String]) - values += line - m.update(partKey, values) - } catch { - case ex: Throwable => - log.error(s"$ex: $line") - } - } - - m.foreach { case (k, v) => - v.grouped(1000).foreach { grouped => - producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_ETL, null, k, grouped.mkString("\n"))) - } - } - - getStreamHelper(kafkaParam).commitConsumerOffset(osr) - } - } - - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala b/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala deleted file mode 100644 index 4c0b927..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/stream/RankingCounterStreaming.scala +++ /dev/null @@ -1,74 +0,0 @@ -package s2.counter.stream - -import kafka.serializer.StringDecoder -import org.apache.spark.streaming.Durations._ -import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions -import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper} -import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig} -import s2.counter.core.CounterFunctions -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable.{HashMap => MutableHashMap} - -/** - * Created by hsleep([email protected]) on 15. 6. 19.. - */ -object RankingCounterStreaming extends SparkApp with WithKafka { - lazy val config = S2ConfigFactory.config - lazy val s2Config = new S2CounterConfig(config) - lazy val className = getClass.getName.stripSuffix("$") - - lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) - - val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER_TRX) - val strInputTopics = inputTopics.mkString(",") - val groupId = buildKafkaGroupId(strInputTopics, "ranking_v2") - val kafkaParam = Map( -// "auto.offset.reset" -> "smallest", - "group.id" -> groupId, - "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, - "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, - "zookeeper.connection.timeout.ms" -> "10000" - ) - val streamHelper = StreamHelper(kafkaParam) - - override def run() = { - validateArgument("interval", "clear") - val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean) - - if (clear) { - streamHelper.kafkaHelper.consumerGroupCleanup() - } - - val conf = sparkConf(s"$strInputTopics: $className") - val ssc = streamingContext(conf, intervalInSec) - val sc = ssc.sparkContext - - implicit val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - // make stream - val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics) - stream.foreachRDD { (rdd, ts) => - // for at-least once semantic - val nextRdd = { - CounterFunctions.makeRankingRdd(rdd, sc.defaultParallelism).foreachPartition { part => - // update ranking counter - CounterFunctions.updateRankingCounter(part, acc) - } - rdd - } - - streamHelper.commitConsumerOffsets(nextRdd.asInstanceOf[HasOffsetRanges]) -// CounterFunctions.makeRankingRdd(rdd, offsets.length).foreachPartitionWithIndex { (i, part) => -// // update ranking counter -// CounterFunctions.updateRankingCounter(part, acc) -// -// // commit offset range -// streamHelper.commitConsumerOffset(offsets(i)) -// } - } - - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala b/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala deleted file mode 100644 index 9cbe212..0000000 --- a/s2counter_loader/src/main/scala/s2/models/DefaultCounterModel.scala +++ /dev/null @@ -1,8 +0,0 @@ -package s2.models - -import s2.config.S2ConfigFactory - -/** - * Created by hsleep([email protected]) on 15. 6. 8.. - */ -case object DefaultCounterModel extends CounterModel(S2ConfigFactory.config) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala new file mode 100644 index 0000000..14e335e --- /dev/null +++ b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala @@ -0,0 +1,33 @@ +package org.apache.s2graph.counter.loader.core + +import com.typesafe.config.ConfigFactory +import org.scalatest.{Matchers, BeforeAndAfterAll, FlatSpec} +import s2.models.DBModel + +/** + * Created by hsleep([email protected]) on 15. 7. 3.. + */ +class CounterEtlFunctionsSpec extends FlatSpec with BeforeAndAfterAll with Matchers { + override def beforeAll: Unit = { + DBModel.initialize(ConfigFactory.load()) + } + + "CounterEtlFunctions" should "parsing log" in { + val data = + """ + |1435107139287 insert e aaPHfITGUU0B_150212123559509 abcd test_case {"cateid":"100110102","shopid":"1","brandid":""} + |1435106916136 insert e Tgc00-wtjp2B_140918153515441 efgh test_case {"cateid":"101104107","shopid":"2","brandid":""} + """.stripMargin.trim.split('\n') + val items = { + for { + line <- data + item <- CounterEtlFunctions.parseEdgeFormat(line) + } yield { + item.action should equal("test_case") + item + } + } + + items should have size 2 + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala new file mode 100644 index 0000000..c0f1db7 --- /dev/null +++ b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/DimensionPropsTest.scala @@ -0,0 +1,46 @@ +package org.apache.s2graph.counter.loader.core + +import org.scalatest.{FunSuite, Matchers} + +import scala.collection.mutable.ListBuffer + +/** + * Created by hsleep([email protected]) on 2015. 10. 7.. + */ +class DimensionPropsTest extends FunSuite with Matchers { + test("makeRequestBody with Seq") { + val requestBody = + """ + |{ + | "_from" => [[_from]] + |} + """.stripMargin + val requestBodyExpected = + """ + |{ + | "_from" => 1 + |} + """.stripMargin + val requestBodyResult = DimensionProps.makeRequestBody(requestBody, Seq(("[[_from]]", "1")).toList) + + requestBodyResult shouldEqual requestBodyExpected + } + + test("makeRequestBody with ListBuffer") { + val requestBody = + """ + |{ + | "_from" => [[_from]] + |} + """.stripMargin + val requestBodyExpected = + """ + |{ + | "_from" => 1 + |} + """.stripMargin + val requestBodyResult = DimensionProps.makeRequestBody(requestBody, ListBuffer(("[[_from]]", "1")).toList) + + requestBodyResult shouldEqual requestBodyExpected + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala new file mode 100644 index 0000000..b716f86 --- /dev/null +++ b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreamingSpec.scala @@ -0,0 +1,198 @@ +package org.apache.s2graph.counter.loader.stream + +import org.apache.s2graph.core.{Management, GraphUtil} +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.counter.loader.counter.core.{CounterEtlItem, CounterFunctions} +import org.apache.s2graph.counter.loader.models.DefaultCounterModel +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.HashMapParam +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import play.api.libs.json.Json +import CounterFunctions.HashMapAccumulable +import s2.counter.core.TimedQualifier.IntervalUnit +import s2.counter.core._ +import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, RankingStorageGraph} +import s2.helper.CounterAdmin +import s2.models.{Counter, DBModel} + +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success} + +/** + * Created by hsleep([email protected]) on 2015. 11. 19.. + */ +class ExactCounterStreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + private val master = "local[2]" + private val appName = "exact_counter_streaming" + private val batchDuration = Seconds(1) + + private var sc: SparkContext = _ + private var ssc: StreamingContext = _ + + val admin = new CounterAdmin(S2ConfigFactory.config) + val graphOp = new GraphOperation(S2ConfigFactory.config) + val s2config = new S2CounterConfig(S2ConfigFactory.config) + + val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) + val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config)) + + val service = "test" + val action = "test_case" + + override def beforeAll(): Unit = { + DBModel.initialize(S2ConfigFactory.config) + + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + ssc = new StreamingContext(conf, batchDuration) + + sc = ssc.sparkContext + + // create test_case label + Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") + if (Label.findByName(action, useCache = false).isEmpty) { + val strJs = + s""" + |{ + | "label": "$action", + | "srcServiceName": "$service", + | "srcColumnName": "src", + | "srcColumnType": "string", + | "tgtServiceName": "$service", + | "tgtColumnName": "$action", + | "tgtColumnType": "string", + | "indices": [ + | ], + | "props": [ + | ] + |} + """.stripMargin + graphOp.createLabel(Json.parse(strJs)) + } + + // action + admin.deleteCounter(service, action).foreach { + case Success(v) => + case Failure(ex) => + println(s"$ex") + } + admin.createCounter(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank = true)) + } + + override def afterAll(): Unit = { + admin.deleteCounter(service, action) + if (ssc != null) { + ssc.stop() + } + } + + "ExactCounter" should "update" in { + val policy = DefaultCounterModel.findByServiceAction(service, action).get + val data = + s""" + |1434534565675 $service $action 70362200_94013572857366866 {"is_shared":"false","relationship":"FE"} {"userId":"48255079","userIdType":"profile_id","value":"1"} + |1434534565675 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"48255079","userIdType":"profile_id","value":"1"} + |1434534566220 $service $action 51223360_94013140590929619 {"is_shared":"false","relationship":"FE"} {"userId":"312383","userIdType":"profile_id","value":"1"} + |1434534566508 $service $action 63808459_94013420047377826 {"is_shared":"false","relationship":"FE"} {"userId":"21968241","userIdType":"profile_id","value":"1"} + |1434534566210 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"6062217","userIdType":"profile_id","value":"1"} + |1434534566459 $service $action 49699692_94012186431261763 {"is_shared":"false","relationship":"FE"} {"userId":"67863471","userIdType":"profile_id","value":"1"} + |1434534565681 $service $action 64556827_94012311028641810 {"is_shared":"false","relationship":"FE"} {"userId":"19381218","userIdType":"profile_id","value":"1"} + |1434534565865 $service $action 41814266_94012477588942163 {"is_shared":"false","relationship":"FE"} {"userId":"19268547","userIdType":"profile_id","value":"1"} + |1434534565865 $service $action 66697741_94007840665633458 {"is_shared":"false","relationship":"FE"} {"userId":"19268547","userIdType":"profile_id","value":"1"} + |1434534566142 $service $action 66444074_94012737377133826 {"is_shared":"false","relationship":"FE"} {"userId":"11917195","userIdType":"profile_id","value":"1"} + |1434534566077 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"37709890","userIdType":"profile_id","value":"1"} + |1434534565938 $service $action 40921487_94012905738975266 {"is_shared":"false","relationship":"FE"} {"userId":"59869223","userIdType":"profile_id","value":"1"} + |1434534566033 $service $action 64506628_93994707216829506 {"is_shared":"false","relationship":"FE"} {"userId":"50375575","userIdType":"profile_id","value":"1"} + |1434534566451 $service $action 40748868_94013448321919139 {"is_shared":"false","relationship":"FE"} {"userId":"12249539","userIdType":"profile_id","value":"1"} + |1434534566669 $service $action 64499956_94013227717457106 {"is_shared":"false","relationship":"FE"} {"userId":"25167419","userIdType":"profile_id","value":"1"} + |1434534566669 $service $action 66444074_94012737377133826 {"is_shared":"false","relationship":"FE"} {"userId":"25167419","userIdType":"profile_id","value":"1"} + |1434534566318 $service $action 64774665_94012837889027027 {"is_shared":"true","relationship":"F"} {"userId":"71557816","userIdType":"profile_id","value":"1"} + |1434534566274 $service $action 67075480_94008509166933763 {"is_shared":"false","relationship":"FE"} {"userId":"57931860","userIdType":"profile_id","value":"1"} + |1434534566659 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"19990823","userIdType":"profile_id","value":"1"} + |1434534566250 $service $action 70670053_93719933175630611 {"is_shared":"true","relationship":"F"} {"userId":"68897412","userIdType":"profile_id","value":"1"} + |1434534566402 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"15541439","userIdType":"profile_id","value":"1"} + |1434534566122 $service $action 48890741_94013463616012786 {"is_shared":"false","relationship":"FE"} {"userId":"48040409","userIdType":"profile_id","value":"1"} + |1434534566055 $service $action 64509008_94002318232678546 {"is_shared":"true","relationship":"F"} {"userId":"46532039","userIdType":"profile_id","value":"1"} + |1434534565994 $service $action 66644368_94009163363033795 {"is_shared":"false","relationship":"FE"} {"userId":"4143147","userIdType":"profile_id","value":"1"} + |1434534566448 $service $action 64587644_93938555963733954 {"is_shared":"false","relationship":"FE"} {"userId":"689042","userIdType":"profile_id","value":"1"} + |1434534565935 $service $action 52812511_94012009551561315 {"is_shared":"false","relationship":"FE"} {"userId":"35509692","userIdType":"profile_id","value":"1"} + |1434534566544 $service $action 70452048_94008573197583762 {"is_shared":"false","relationship":"FE"} {"userId":"5172421","userIdType":"profile_id","value":"1"} + |1434534565929 $service $action 54547023_94013384964278435 {"is_shared":"false","relationship":"FE"} {"userId":"33556498","userIdType":"profile_id","value":"1"} + |1434534566358 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"8987346","userIdType":"profile_id","value":"1"} + |1434534566057 $service $action 67075480_94008509166933763 {"is_shared":"false","relationship":"FE"} {"userId":"35134964","userIdType":"profile_id","value":"1"} + |1434534566140 $service $action 54547023_94013384964278435 {"is_shared":"false","relationship":"FE"} {"userId":"11900315","userIdType":"profile_id","value":"1"} + |1434534566158 $service $action 64639374_93888330176053635 {"is_shared":"true","relationship":"F"} {"userId":"49996643","userIdType":"profile_id","value":"1"} + |1434534566025 $service $action 67265128_94009084771192002 {"is_shared":"false","relationship":"FE"} {"userId":"37801480","userIdType":"profile_id","value":"1"} + """.stripMargin.trim + // println(data) + val rdd = sc.parallelize(Seq(("", data))) + + // rdd.foreachPartition { part => + // part.foreach(println) + // } + val resultRdd = CounterFunctions.makeExactRdd(rdd, 2) + val result = resultRdd.collect().toMap + + // result.foreachPartition { part => + // part.foreach(println) + // } + + val parsed = { + for { + line <- GraphUtil.parseString(data) + item <- CounterEtlItem(line).toSeq + ev <- CounterFunctions.exactMapper(item).toSeq + } yield { + ev + } + } + val parsedResult = parsed.groupBy(_._1).mapValues(values => values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ + _, 0L))) + + // parsedResult.foreach { case (k, v) => + // println(k, v) + // } + + result should not be empty + result should equal (parsedResult) + + val itemId = "46889329_94013502934177075" + val key = ExactKey(DefaultCounterModel.findByServiceAction(service, action).get, itemId, checkItemType = true) + val value = result.get(key) + + value should not be empty + value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String, String])) should equal (Some(6L)) + + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map.empty[String, Set[String]]) should be (None) + + val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + resultRdd.foreachPartition { part => + CounterFunctions.updateExactCounter(part.toSeq, acc) + } + + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map.empty[String, String]) -> Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map.empty[String, Set[String]]) should be (Some(expected)) + } + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map("is_shared" -> "false")) -> Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("is_shared" -> Set("false"))) should be (Some(expected)) + } + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map("relationship" -> "FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("relationship" -> Set("FE"))) should be (Some(expected)) + } + Option(FetchedCountsGrouped(key, Map( + (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" -> "FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.relationship.false.FE") -> 6l) + ))).foreach { expected => + exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be (Some(expected)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala new file mode 100644 index 0000000..2a913e5 --- /dev/null +++ b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreamingSpec.scala @@ -0,0 +1,451 @@ +package org.apache.s2graph.counter.loader.stream + +import org.apache.s2graph.core.Management +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.counter.loader.counter.core.CounterFunctions +import org.apache.s2graph.counter.loader.models.DefaultCounterModel +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.HashMapParam +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import play.api.libs.json.Json +import CounterFunctions.HashMapAccumulable +import s2.counter.core.TimedQualifier.IntervalUnit +import s2.counter.core._ +import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, RankingStorageGraph} +import s2.helper.CounterAdmin +import s2.models.{Counter, DBModel} + +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Success} + +/** + * Created by hsleep([email protected]) on 15. 6. 17.. + */ +class RankingCounterStreamingSpec extends FlatSpec with BeforeAndAfterAll with Matchers { + private val master = "local[2]" + private val appName = "ranking_counter_streaming" + private val batchDuration = Seconds(1) + + private var sc: SparkContext = _ + private var ssc: StreamingContext = _ + + val admin = new CounterAdmin(S2ConfigFactory.config) + val graphOp = new GraphOperation(S2ConfigFactory.config) + val s2config = new S2CounterConfig(S2ConfigFactory.config) + + val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) + val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config)) + + val service = "test" + val action = "test_case" + val action_base = "test_case_base" + val action_rate = "test_case_rate" + val action_rate_threshold = "test_case_rate_threshold" + val action_trend = "test_case_trend" + + override def beforeAll(): Unit = { + DBModel.initialize(S2ConfigFactory.config) + + val conf = new SparkConf() + .setMaster(master) + .setAppName(appName) + + ssc = new StreamingContext(conf, batchDuration) + + sc = ssc.sparkContext + + admin.setupCounterOnGraph() + + // create test_case label + Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") + if (Label.findByName(action, useCache = false).isEmpty) { + val strJs = + s""" + |{ + | "label": "$action", + | "srcServiceName": "$service", + | "srcColumnName": "src", + | "srcColumnType": "string", + | "tgtServiceName": "$service", + | "tgtColumnName": "$action", + | "tgtColumnType": "string", + | "indices": [ + | ], + | "props": [ + | ] + |} + """.stripMargin + graphOp.createLabel(Json.parse(strJs)) + } + if (Label.findByName(action_base, useCache = false).isEmpty) { + val strJs = + s""" + |{ + | "label": "$action_base", + | "srcServiceName": "$service", + | "srcColumnName": "src", + | "srcColumnType": "string", + | "tgtServiceName": "$service", + | "tgtColumnName": "$action", + | "tgtColumnType": "string", + | "indices": [ + | ], + | "props": [ + | ] + |} + """.stripMargin + graphOp.createLabel(Json.parse(strJs)) + } + + // action + admin.deleteCounter(service, action).foreach { + case Success(v) => + case Failure(ex) => + println(s"$ex") + } + admin.createCounter(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, autoComb = true, "", useRank = true)) + val policy = DefaultCounterModel.findByServiceAction(service, action).get + + // action_base + admin.deleteCounter(service, action_base).foreach { + case Success(v) => + case Failure(ex) => + println(s"$ex") + } + admin.createCounter(Counter(useFlag = true, 2, service, action_base, Counter.ItemType.STRING, autoComb = true, "", useRank = true)) + val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get + + // action_rate + admin.deleteCounter(service, action_rate).foreach { + case Success(v) => + case Failure(ex) => + println(s"$ex") + } + admin.createCounter(Counter(useFlag = true, 2, service, action_rate, Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true, + rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id))) + + // action_rate_threshold + admin.deleteCounter(service, action_rate_threshold).foreach { + case Success(v) => + case Failure(ex) => + println(s"$ex") + } + admin.createCounter(Counter(useFlag = true, 2, service, action_rate_threshold, Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true, + rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id), rateThreshold = Some(3))) + + // action_trend + admin.deleteCounter(service, action_trend).foreach { + case Success(v) => + case Failure(ex) => + println(s"$ex") + } + admin.createCounter(Counter(useFlag = true, 2, service, action_trend, Counter.ItemType.STRING, autoComb = true, "p1", useRank = true, + rateActionId = Some(policy.id), rateBaseId = Some(policy.id))) + } + + override def afterAll(): Unit = { + admin.deleteCounter(service, action) + admin.deleteCounter(service, action_base) + admin.deleteCounter(service, action_rate) + admin.deleteCounter(service, action_rate_threshold) + admin.deleteCounter(service, action_trend) + if (ssc != null) { + ssc.stop() + } + } + + "RankingCounterStreaming" should "update" in { + val policy = DefaultCounterModel.findByServiceAction(service, action).get +// val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get + + rankingCounter.ready(policy) should equal (true) + val data = + s""" + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":3}]} + |{"success":false,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} + |{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} + |{"success":true,"policyId":${policy.id},"item":"4","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} + """.stripMargin.trim + // println(data) + val rdd = sc.parallelize(Seq(("", data))) + + // rdd.foreachPartition { part => + // part.foreach(println) + // } + + val result = CounterFunctions.makeRankingRdd(rdd, 2).collect().toMap + + // result.foreachPartition { part => + // part.foreach(println) + // } + + val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + result should not be empty + val rankKey = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) + result should contain (rankKey -> Map( + "1" -> RankingValue(3, 1), + "3" -> RankingValue(2, 2), + "4" -> RankingValue(1, 1) + )) + + val key = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) + val value = result.get(key) + + value should not be empty + value.get.get("1").get should equal (RankingValue(3, 1)) + value.get.get("2") shouldBe empty + value.get.get("3").get should equal (RankingValue(2, 2)) + + rankingCounter.ready(policy) should equal (true) + + // delete, update and get + rankingCounter.delete(key) + Thread.sleep(1000) + CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc) + Thread.sleep(1000) + val rst = rankingCounter.getTopK(key) + + rst should not be empty +// rst.get.totalScore should equal(4f) + rst.get.values should contain allOf(("3", 2d), ("4", 1d), ("1", 3d)) + } + +// "rate by base" >> { +// val data = +// """ +// |{"success":true,"policyId":42,"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]} +// """.stripMargin.trim +// val rdd = sc.parallelize(Seq(("", data))) +// +// val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2).collect() +// trxLogRdd.foreach { log => +// CounterFunctions.rateBaseRankingMapper(log) must not be empty +// } +// +// true must_== true +// } + + it should "update rate ranking counter" in { + val policy = DefaultCounterModel.findByServiceAction(service, action).get + val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get + val ratePolicy = DefaultCounterModel.findByServiceAction(service, action_rate).get + + // update base policy + val eq = ExactQualifier(TimedQualifier("M", 1433084400000l), "") + val exactKey = ExactKey(basePolicy, "1", checkItemType = true) + + // check base item count + exactCounter.updateCount(basePolicy, Seq( + (exactKey, Map(eq -> 2l)) + )) + Thread.sleep(1000) + + // direct get + val baseCount = exactCounter.getCount(basePolicy, "1", Seq(IntervalUnit.MONTHLY), 1433084400000l, 1433084400000l, Map.empty[String, Set[String]]) + baseCount should not be empty + baseCount.get should equal (FetchedCountsGrouped(exactKey, Map( + (eq.tq.q, Map.empty[String, String]) -> Map(eq-> 2l) + ))) + + // related get + val relatedCount = exactCounter.getRelatedCounts(basePolicy, Seq("1" -> Seq(eq))) + relatedCount should not be empty + relatedCount.get("1") should not be empty + relatedCount.get("1").get should equal (Map(eq -> 2l)) + + val data = + s""" + |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]} + |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]} + |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":2,"result":4}]} + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} + """.stripMargin.trim + // println(data) + val rdd = sc.parallelize(Seq(("", data))) + + // rdd.foreachPartition { part => + // part.foreach(println) + // } + + val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2) + trxLogRdd.count() should equal (data.trim.split('\n').length) + + val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2) + itemRankingRdd.foreach(println) + + val result = CounterFunctions.rateRankingCount(itemRankingRdd, 2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id) + result.foreach(println) + result should have size 3 + + val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + // rate ranking + val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) + val value = result.get(key) + +// println(key, value) + + value should not be empty + value.get.get("1") should not be empty + value.get.get("1").get should equal (RankingValue(1, 0)) + value.get.get("2").get should equal (RankingValue(0.25, 0)) + + val key2 = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 1433084400000L), "p1.1")) + val value2 = result.get(key2) + +// println(key2, value2) + + val values = value.map(v => (key, v)).toSeq ++ value2.map(v => (key2, v)).toSeq + println(s"values: $values") + + // delete, update and get + rankingCounter.delete(key) + rankingCounter.delete(key2) + Thread.sleep(1000) + CounterFunctions.updateRankingCounter(values, acc) + // for update graph + Thread.sleep(1000) + + val rst = rankingCounter.getTopK(key) + rst should not be empty + rst.get.values should equal (Seq(("1", 1d), ("2", 0.25d))) + + val rst2 = rankingCounter.getTopK(key2) + rst2 should not be empty + rst2.get.values should equal (Seq(("2", 0.25d))) + } + + it should "update rate ranking counter with threshold" in { + val policy = DefaultCounterModel.findByServiceAction(service, action).get + val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get + val ratePolicy = DefaultCounterModel.findByServiceAction(service, action_rate_threshold).get + + val data = + s""" + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} + |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]} + |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]} + |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]} + """.stripMargin.trim + // println(data) + val rdd = sc.parallelize(Seq(("", data))) + + // rdd.foreachPartition { part => + // part.foreach(println) + // } + + val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2) + trxLogRdd.count() should equal (data.trim.split('\n').length) + + val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2) + itemRankingRdd.foreach(println) + + val result = CounterFunctions.rateRankingCount(itemRankingRdd, 2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id) + result.foreach(println) + result should have size 2 + + val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + // rate ranking + val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) + val value = result.get(key) + + value should not be empty + value.get.get("1") should be (None) + value.get.get("2").get should equal (RankingValue(0.25, 0)) + + // delete, update and get + rankingCounter.delete(key) + Thread.sleep(1000) + CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc) + Thread.sleep(1000) + val rst = rankingCounter.getTopK(key) + + rst should not be empty + rst.get.values should equal (Seq(("2", 0.25d))) + } + + it should "update trend ranking counter" in { + val policy = DefaultCounterModel.findByServiceAction(service, action).get + val trendPolicy = DefaultCounterModel.findByServiceAction(service, action_trend).get + + val exactKey1 = ExactKey(policy, "1", checkItemType = true) + val exactKey2 = ExactKey(policy, "2", checkItemType = true) + // update old key value + val tq1 = TimedQualifier("M", 1435676400000l) + val tq2 = TimedQualifier("M", 1427814000000l) + exactCounter.updateCount(policy, Seq( + exactKey1 -> Map(ExactQualifier(tq1.add(-1), "") -> 1l, ExactQualifier(tq2.add(-1), "") -> 92l) + )) + val eq1 = ExactQualifier(tq1, "") + val eq2 = ExactQualifier(tq2, "") + + val oldCount = exactCounter.getPastCounts(policy, Seq("1" -> Seq(eq1, eq2), "2" -> Seq(eq1, eq1.copy(dimension = "gender.M")))) + oldCount should not be empty + oldCount.get("1").get should equal(Map(eq1 -> 1l, eq2 -> 92l)) + oldCount.get("2") should be (None) + + val data = + s""" + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":2}]} + |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1435676400000,"value":1,"result":1}]} + |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1427814000000,"value":1,"result":92}]} + """.stripMargin.trim + // println(data) + val rdd = sc.parallelize(Seq(("", data))) + + // rdd.foreachPartition { part => + // part.foreach(println) + // } + + val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2) + trxLogRdd.count() should equal (data.trim.split('\n').length) + + val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2) + itemRankingRdd.foreach(println) + + val result = CounterFunctions.trendRankingCount(itemRankingRdd, 2).collect().toMap + result.foreach(println) + // dimension gender.M is ignored, because gender is not defined dimension in trend policy. + result should have size 2 + + val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + // trend ranking + val key = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M", 1435676400000L), "")) + val value = result.get(key) + + value should not be empty + value.get.get("1").get should equal (RankingValue(2, 0)) + value.get.get("2").get should equal (RankingValue(1, 0)) + + val key2 = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M", 1427814000000L), "")) + val value2 = result.get(key2) + + value2 should not be empty + value2.get.get("1").get should equal (RankingValue(1, 0)) + + // delete, update and get + rankingCounter.delete(key) + Thread.sleep(1000) + CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc) + Thread.sleep(1000) + val rst = rankingCounter.getTopK(key) + + rst should not be empty + rst.get.values should equal (Seq("1" -> 2, "2" -> 1)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala b/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala deleted file mode 100644 index 520b30f..0000000 --- a/s2counter_loader/src/test/scala/s2/counter/core/CounterEtlFunctionsSpec.scala +++ /dev/null @@ -1,33 +0,0 @@ -package s2.counter.core - -import com.typesafe.config.ConfigFactory -import org.scalatest.{Matchers, BeforeAndAfterAll, FlatSpec} -import s2.models.DBModel - -/** - * Created by hsleep([email protected]) on 15. 7. 3.. - */ -class CounterEtlFunctionsSpec extends FlatSpec with BeforeAndAfterAll with Matchers { - override def beforeAll: Unit = { - DBModel.initialize(ConfigFactory.load()) - } - - "CounterEtlFunctions" should "parsing log" in { - val data = - """ - |1435107139287 insert e aaPHfITGUU0B_150212123559509 abcd test_case {"cateid":"100110102","shopid":"1","brandid":""} - |1435106916136 insert e Tgc00-wtjp2B_140918153515441 efgh test_case {"cateid":"101104107","shopid":"2","brandid":""} - """.stripMargin.trim.split('\n') - val items = { - for { - line <- data - item <- CounterEtlFunctions.parseEdgeFormat(line) - } yield { - item.action should equal("test_case") - item - } - } - - items should have size 2 - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala b/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala deleted file mode 100644 index b658d05..0000000 --- a/s2counter_loader/src/test/scala/s2/counter/core/DimensionPropsTest.scala +++ /dev/null @@ -1,46 +0,0 @@ -package s2.counter.core - -import org.scalatest.{FunSuite, Matchers} - -import scala.collection.mutable.ListBuffer - -/** - * Created by hsleep([email protected]) on 2015. 10. 7.. - */ -class DimensionPropsTest extends FunSuite with Matchers { - test("makeRequestBody with Seq") { - val requestBody = - """ - |{ - | "_from" => [[_from]] - |} - """.stripMargin - val requestBodyExpected = - """ - |{ - | "_from" => 1 - |} - """.stripMargin - val requestBodyResult = DimensionProps.makeRequestBody(requestBody, Seq(("[[_from]]", "1")).toList) - - requestBodyResult shouldEqual requestBodyExpected - } - - test("makeRequestBody with ListBuffer") { - val requestBody = - """ - |{ - | "_from" => [[_from]] - |} - """.stripMargin - val requestBodyExpected = - """ - |{ - | "_from" => 1 - |} - """.stripMargin - val requestBodyResult = DimensionProps.makeRequestBody(requestBody, ListBuffer(("[[_from]]", "1")).toList) - - requestBodyResult shouldEqual requestBodyExpected - } -}
