http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/FunctionParser.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/FunctionParser.scala b/s2counter_core/src/main/scala/s2/util/FunctionParser.scala deleted file mode 100644 index 2454b0f..0000000 --- a/s2counter_core/src/main/scala/s2/util/FunctionParser.scala +++ /dev/null @@ -1,24 +0,0 @@ -package s2.util - -/** - * Created by hsleep([email protected]) on 15. 6. 29.. - */ -object FunctionParser { - val funcRe = """([a-zA-Z_]+)(\((\d+)?\))?""".r - - def apply(str: String): Option[(String, String)] = { - str match { - case funcRe(funcName, funcParam, funcArg) => - funcName match { - case x: String => - Some((funcName, funcArg match { - case x: String => funcArg - case null => "" - })) - case null => None - } - case _ => - None - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/Hashes.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/Hashes.scala b/s2counter_core/src/main/scala/s2/util/Hashes.scala deleted file mode 100644 index 2edbcd8..0000000 --- a/s2counter_core/src/main/scala/s2/util/Hashes.scala +++ /dev/null @@ -1,24 +0,0 @@ -package s2.util - -import org.apache.hadoop.hbase.util.Bytes - -import scala.util.hashing.MurmurHash3 - -/** - * Created by hsleep([email protected]) on 15. 5. 27.. - */ -object Hashes { - def sha1(s: String): String = { - val md = java.security.MessageDigest.getInstance("SHA-1") - Bytes.toHex(md.digest(s.getBytes("UTF-8"))) - } - - private def positiveHash(h: Int): Int = { - if (h < 0) -1 * (h + 1) else h - } - - def murmur3(s: String): Int = { - val hash = MurmurHash3.stringHash(s) - positiveHash(hash) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala b/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala deleted file mode 100644 index 9c13fa0..0000000 --- a/s2counter_core/src/main/scala/s2/util/ReduceMapValue.scala +++ /dev/null @@ -1,12 +0,0 @@ -package s2.util - -/** - * Created by hsleep([email protected]) on 15. 7. 20.. - */ -class ReduceMapValue[T, U](op: (U, U) => U, default: U) { - def apply(m1: Map[T, U], m2: Map[T, U]): Map[T, U] = { - m1 ++ m2.map { case (k, v) => - k -> op(m1.getOrElse(k, default), v) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/Retry.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/Retry.scala b/s2counter_core/src/main/scala/s2/util/Retry.scala deleted file mode 100644 index d1f7213..0000000 --- a/s2counter_core/src/main/scala/s2/util/Retry.scala +++ /dev/null @@ -1,47 +0,0 @@ -package s2.util - -import scala.annotation.tailrec -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success, Try} - -/** - * Created by hsleep([email protected]) on 15. 1. 6.. - */ -object Retry { - @tailrec - def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => T): T = { - Try { fn } match { - case Success(x) => x - case Failure(e) if e.isInstanceOf[RetryStopException] => throw e.getCause - case _ if n > 1 => - // backoff - if (withSleep) Thread.sleep(tryCount * 1000) - apply(n - 1, withSleep, tryCount + 1)(fn) - case Failure(e) => throw e - } - } -} - -object RetryAsync { - def apply[T](n: Int, withSleep: Boolean = true, tryCount: Int = 0)(fn: => Future[T])(implicit ex: ExecutionContext): Future[T] = { - val promise = Promise[T]() - fn onComplete { - case Success(x) => promise.success(x) - case Failure(e) if e.isInstanceOf[RetryStopException] => promise.failure(e.getCause) - case _ if n > 1 => - // backoff - if (withSleep) Thread.sleep(tryCount * 1000) - apply(n - 1, withSleep, tryCount + 1)(fn) - case Failure(e) => promise.failure(e) - } - promise.future - } -} - -class RetryStopException(message: String, cause: Throwable) - extends Exception(message, cause) { - - def this(message: String) = this(message, null) - - def this(cause: Throwable) = this(cause.toString, cause) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/SplitBytes.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/SplitBytes.scala b/s2counter_core/src/main/scala/s2/util/SplitBytes.scala deleted file mode 100644 index a5ca998..0000000 --- a/s2counter_core/src/main/scala/s2/util/SplitBytes.scala +++ /dev/null @@ -1,24 +0,0 @@ -package s2.util - -/** - * Created by hsleep([email protected]) on 15. 6. 12.. - */ -object SplitBytes { - def apply(bytes: Array[Byte], sizes: Seq[Int]): Seq[Array[Byte]] = { - if (sizes.sum > bytes.length) { - throw new Exception(s"sizes.sum bigger than bytes.length ${sizes.sum} > ${bytes.length}} ") - } - - var position = 0 - val rtn = { - for { - size <- sizes - } yield { - val slice = bytes.slice(position, position + size) - position += size - slice - } - } - rtn ++ Seq(bytes.drop(position)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/main/scala/s2/util/UnitConverter.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/main/scala/s2/util/UnitConverter.scala b/s2counter_core/src/main/scala/s2/util/UnitConverter.scala deleted file mode 100644 index fb0b0d0..0000000 --- a/s2counter_core/src/main/scala/s2/util/UnitConverter.scala +++ /dev/null @@ -1,28 +0,0 @@ -package s2.util - -/** - * Created by hsleep([email protected]) on 15. 4. 3.. - */ -object UnitConverter { - def toMillis(ts: Int): Long = { - ts * 1000L - } - - def toMillis(ts: Long): Long = { - if (ts <= Int.MaxValue) { - ts * 1000 - } else { - ts - } - } - - def toMillis(s: String): Long = { - toMillis(s.toLong) - } - - def toHours(ts: Long): Long = { - toMillis(ts) / HOUR_MILLIS * HOUR_MILLIS - } - - val HOUR_MILLIS = 60 * 60 * 1000 -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala new file mode 100644 index 0000000..1ae7b12 --- /dev/null +++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala @@ -0,0 +1,166 @@ +package org.apache.s2graph.counter.core + +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.mysqls.Label +import org.apache.s2graph.core.{Graph, Management} +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit +import org.apache.s2graph.counter.core.v2.{GraphOperation, RankingStorageGraph} +import org.apache.s2graph.counter.helper.CounterAdmin +import org.apache.s2graph.counter.models.{Counter, CounterModel, DBModel} +import org.apache.s2graph.counter.util.Retry +import org.specs2.mutable.Specification +import org.specs2.specification.BeforeAfterAll +import play.api.libs.json.Json + +import scala.util.{Failure, Random, Success, Try} + +class RankingCounterSpec extends Specification with BeforeAfterAll { + val config = ConfigFactory.load() + DBModel.initialize(config) + val counterModel = new CounterModel(config) + val admin = new CounterAdmin(config) + + val s2config = new S2CounterConfig(config) + +// val rankingCounterV1 = new RankingCounter(config, new RankingStorageV1(config)) +// val rankingCounterV2 = new RankingCounter(config, new RankingStorageV2(config)) + val rankingCounterV2 = new RankingCounter(config, new RankingStorageGraph(config)) + +// "RankingCounterV1" >> { +// val policy = counterModel.findByServiceAction("test", "test_action", useCache = false).get +// val rankingKey = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, String])) +// "get top k" >> { +// val result = rankingCounterV1.getTopK(rankingKey, 100) +// +// println(result) +// +// result must not be empty +// } +// +// "get and increment" >> { +// val result = rankingCounterV1.getTopK(rankingKey, 100).get +// +// val value = 2d +// val contents = { +// for { +// (item, score) <- result.values +// } yield { +// item -> RankingValue(score + value, value) +// } +// }.toMap +// rankingCounterV1.update(rankingKey, contents, 100) +// +// val result2 = rankingCounterV1.getTopK(rankingKey, 100).get +// +// result2.totalScore must_== result.totalScore + contents.values.map(_.increment).sum +// result2.values must containTheSameElementsAs(result.values.map { case (k, v) => (k, v + value) }) +// } +// } + + val service = "test" + val action = "test_case" + + override def beforeAll: Unit = { + Try { + Retry(3) { + admin.setupCounterOnGraph + } + + val graphOp = new GraphOperation(config) + val graph = new Graph(config)(scala.concurrent.ExecutionContext.global) + val management = new Management(graph) + management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") + val strJs = + s""" + |{ + | "label": "$action", + | "srcServiceName": "$service", + | "srcColumnName": "src", + | "srcColumnType": "string", + | "tgtServiceName": "$service", + | "tgtColumnName": "$action", + | "tgtColumnType": "string", + | "indices": [ + | ], + | "props": [ + | ] + |} + """.stripMargin + Retry(3) { + if (Label.findByName(action).isEmpty) { + graphOp.createLabel(Json.parse(strJs)) + } + } + + admin.deleteCounter(service, action).foreach { + case Failure(ex) => + println(s"$ex") + throw ex + case Success(v) => + } + admin.createCounter(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, autoComb = true, "", useRank = true)) + } match { + case Failure(ex) => + println(s"$ex") + case Success(_) => + } + } + + override def afterAll: Unit = { + admin.deleteCounter(service, action) + } + + "RankingCounterV2" >> { + "get top k" >> { + val policy = counterModel.findByServiceAction(service, action, useCache = true).get + + val rankingKey = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, String])) + + val orgMap = Map( + "1" -> 1d, + "2" -> 2d, + "3" -> 3d, + "4" -> 4d, + "5" -> 5d, + "6" -> 6d, + "7" -> 7d, + "8" -> 8d, + "9" -> 9d, + "10" -> 10d, + "11" -> 11d, + "12" -> 12d, + "13" -> 13d, + "14" -> 14d, + "15" -> 15d, + "16" -> 16d, + "17" -> 17d, + "18" -> 18d, + "19" -> 19d, + "20" -> 20d, + "100" -> 100d + ) + + val valueMap = Random.shuffle(orgMap).toMap + + val predictResult = valueMap.toSeq.sortBy(-_._2) + + val rvMap = valueMap.map { case (k, score) => + k -> RankingValue(score, 0) + } + Try { + rankingCounterV2.update(rankingKey, rvMap, 100) + }.isSuccess must_== true + + Thread.sleep(1000) + + val result : RankingResult = rankingCounterV2.getTopK(rankingKey, 10).get + + println(result.values) + + result must not be empty + result.values must have size 10 + result.values must_== predictResult.take(10) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala new file mode 100644 index 0000000..a52b8f9 --- /dev/null +++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala @@ -0,0 +1,50 @@ +package org.apache.s2graph.counter.models + +import com.typesafe.config.ConfigFactory +import org.specs2.mutable.Specification + +class CounterModelSpec extends Specification { + val config = ConfigFactory.load() + + DBModel.initialize(config) + + "CounterModel" should { + val model = new CounterModel(config) + "findById" in { + model.findById(0, useCache = false) must beNone + } + + "findByServiceAction using cache" in { + val service = "test" + val action = "test_action" + val counter = Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, + autoComb = true, "", useProfile = true, None, useRank = true, 0, None, None, None, None, None, None) + model.createServiceAction(counter) + model.findByServiceAction(service, action, useCache = false) must beSome + val opt = model.findByServiceAction(service, action, useCache = true) + opt must beSome + model.findById(opt.get.id) must beSome + model.deleteServiceAction(opt.get) + model.findById(opt.get.id) must beSome + model.findById(opt.get.id, useCache = false) must beNone + } + + "create and delete policy" in { + val (service, action) = ("test", "test_case") + for { + policy <- model.findByServiceAction(service, action, useCache = false) + } { + model.deleteServiceAction(policy) + } + model.createServiceAction(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, + autoComb = true, "", useProfile = true, None, useRank = true, 0, None, None, None, None, None, None)) + model.findByServiceAction(service, action, useCache = false).map { policy => + policy.service mustEqual service + policy.action mustEqual action + model.deleteServiceAction(policy) + policy + } must beSome + model.findByServiceAction(service, action, useCache = false) must beNone + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala new file mode 100644 index 0000000..220c30f --- /dev/null +++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSpec.scala @@ -0,0 +1,33 @@ +package org.apache.s2graph.counter.models + +import org.apache.s2graph.counter.models.Counter.ItemType +import org.specs2.mutable.Specification + +class CounterSpec extends Specification { + "Counter" should { + "dimension auto combination" in { + val policy = Counter( + useFlag = true, + 2, + "test", + "test_case", + ItemType.LONG, + autoComb = true, + "p1,p2,p3", + useProfile = false, + None, + useRank = true, + 0, + None, + None, + None, + None, + None, + None + ) + + policy.dimensionSp mustEqual Array("p1", "p2", "p3") + policy.dimensionList.map { arr => arr.toSeq }.toSet -- Set(Seq.empty[String], Seq("p1"), Seq("p2"), Seq("p3"), Seq("p1", "p2"), Seq("p1", "p3"), Seq("p2", "p3"), Seq("p1", "p2", "p3")) must beEmpty + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala deleted file mode 100644 index ea67a2a..0000000 --- a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala +++ /dev/null @@ -1,169 +0,0 @@ -package s2.counter.core - -import com.kakao.s2graph.core.{Management, Graph} -import com.kakao.s2graph.core.mysqls.Label -import com.typesafe.config.ConfigFactory -import org.specs2.mutable.Specification -import org.specs2.specification.BeforeAfterAll -import play.api.libs.json.Json -import s2.config.S2CounterConfig -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core.v2.{GraphOperation, RankingStorageGraph} -import s2.helper.CounterAdmin -import s2.models.{Counter, CounterModel, DBModel} -import s2.util.Retry - -import scala.util.{Failure, Random, Success, Try} - -/** - * Created by hsleep([email protected]) on 15. 6. 19.. - */ -class RankingCounterSpec extends Specification with BeforeAfterAll { - val config = ConfigFactory.load() - DBModel.initialize(config) - val counterModel = new CounterModel(config) - val admin = new CounterAdmin(config) - - val s2config = new S2CounterConfig(config) - -// val rankingCounterV1 = new RankingCounter(config, new RankingStorageV1(config)) -// val rankingCounterV2 = new RankingCounter(config, new RankingStorageV2(config)) - val rankingCounterV2 = new RankingCounter(config, new RankingStorageGraph(config)) - -// "RankingCounterV1" >> { -// val policy = counterModel.findByServiceAction("test", "test_action", useCache = false).get -// val rankingKey = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, String])) -// "get top k" >> { -// val result = rankingCounterV1.getTopK(rankingKey, 100) -// -// println(result) -// -// result must not be empty -// } -// -// "get and increment" >> { -// val result = rankingCounterV1.getTopK(rankingKey, 100).get -// -// val value = 2d -// val contents = { -// for { -// (item, score) <- result.values -// } yield { -// item -> RankingValue(score + value, value) -// } -// }.toMap -// rankingCounterV1.update(rankingKey, contents, 100) -// -// val result2 = rankingCounterV1.getTopK(rankingKey, 100).get -// -// result2.totalScore must_== result.totalScore + contents.values.map(_.increment).sum -// result2.values must containTheSameElementsAs(result.values.map { case (k, v) => (k, v + value) }) -// } -// } - - val service = "test" - val action = "test_case" - - override def beforeAll: Unit = { - Try { - Retry(3) { - admin.setupCounterOnGraph - } - - val graphOp = new GraphOperation(config) - val graph = new Graph(config)(scala.concurrent.ExecutionContext.global) - val management = new Management(graph) - management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") - val strJs = - s""" - |{ - | "label": "$action", - | "srcServiceName": "$service", - | "srcColumnName": "src", - | "srcColumnType": "string", - | "tgtServiceName": "$service", - | "tgtColumnName": "$action", - | "tgtColumnType": "string", - | "indices": [ - | ], - | "props": [ - | ] - |} - """.stripMargin - Retry(3) { - if (Label.findByName(action).isEmpty) { - graphOp.createLabel(Json.parse(strJs)) - } - } - - admin.deleteCounter(service, action).foreach { - case Failure(ex) => - println(s"$ex") - throw ex - case Success(v) => - } - admin.createCounter(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, autoComb = true, "", useRank = true)) - } match { - case Failure(ex) => - println(s"$ex") - case Success(_) => - } - } - - override def afterAll: Unit = { - admin.deleteCounter(service, action) - } - - "RankingCounterV2" >> { - "get top k" >> { - val policy = counterModel.findByServiceAction(service, action, useCache = true).get - - val rankingKey = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier(IntervalUnit.TOTAL, 0L), Map.empty[String, String])) - - val orgMap = Map( - "1" -> 1d, - "2" -> 2d, - "3" -> 3d, - "4" -> 4d, - "5" -> 5d, - "6" -> 6d, - "7" -> 7d, - "8" -> 8d, - "9" -> 9d, - "10" -> 10d, - "11" -> 11d, - "12" -> 12d, - "13" -> 13d, - "14" -> 14d, - "15" -> 15d, - "16" -> 16d, - "17" -> 17d, - "18" -> 18d, - "19" -> 19d, - "20" -> 20d, - "100" -> 100d - ) - - val valueMap = Random.shuffle(orgMap).toMap - - val predictResult = valueMap.toSeq.sortBy(-_._2) - - val rvMap = valueMap.map { case (k, score) => - k -> RankingValue(score, 0) - } - Try { - rankingCounterV2.update(rankingKey, rvMap, 100) - }.isSuccess must_== true - - Thread.sleep(1000) - - val result : RankingResult = rankingCounterV2.getTopK(rankingKey, 10).get - - println(result.values) - - result must not be empty - result.values must have size 10 - result.values must_== predictResult.take(10) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala b/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala deleted file mode 100644 index dff55ef..0000000 --- a/s2counter_core/src/test/scala/s2/models/CounterModelSpec.scala +++ /dev/null @@ -1,53 +0,0 @@ -package s2.models - -import com.typesafe.config.ConfigFactory -import org.specs2.mutable.Specification - -/** - * Created by hsleep([email protected]) on 15. 5. 26.. - */ -class CounterModelSpec extends Specification { - val config = ConfigFactory.load() - - DBModel.initialize(config) - - "CounterModel" should { - val model = new CounterModel(config) - "findById" in { - model.findById(0, useCache = false) must beNone - } - - "findByServiceAction using cache" in { - val service = "test" - val action = "test_action" - val counter = Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, - autoComb = true, "", useProfile = true, None, useRank = true, 0, None, None, None, None, None, None) - model.createServiceAction(counter) - model.findByServiceAction(service, action, useCache = false) must beSome - val opt = model.findByServiceAction(service, action, useCache = true) - opt must beSome - model.findById(opt.get.id) must beSome - model.deleteServiceAction(opt.get) - model.findById(opt.get.id) must beSome - model.findById(opt.get.id, useCache = false) must beNone - } - - "create and delete policy" in { - val (service, action) = ("test", "test_case") - for { - policy <- model.findByServiceAction(service, action, useCache = false) - } { - model.deleteServiceAction(policy) - } - model.createServiceAction(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, - autoComb = true, "", useProfile = true, None, useRank = true, 0, None, None, None, None, None, None)) - model.findByServiceAction(service, action, useCache = false).map { policy => - policy.service mustEqual service - policy.action mustEqual action - model.deleteServiceAction(policy) - policy - } must beSome - model.findByServiceAction(service, action, useCache = false) must beNone - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_core/src/test/scala/s2/models/CounterSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_core/src/test/scala/s2/models/CounterSpec.scala b/s2counter_core/src/test/scala/s2/models/CounterSpec.scala deleted file mode 100644 index a03c70e..0000000 --- a/s2counter_core/src/test/scala/s2/models/CounterSpec.scala +++ /dev/null @@ -1,36 +0,0 @@ -package s2.models - -import org.specs2.mutable.Specification -import s2.models.Counter.ItemType - -/** - * Created by hsleep([email protected]) on 15. 6. 11.. - */ -class CounterSpec extends Specification { - "Counter" should { - "dimension auto combination" in { - val policy = Counter( - useFlag = true, - 2, - "test", - "test_case", - ItemType.LONG, - autoComb = true, - "p1,p2,p3", - useProfile = false, - None, - useRank = true, - 0, - None, - None, - None, - None, - None, - None - ) - - policy.dimensionSp mustEqual Array("p1", "p2", "p3") - policy.dimensionList.map { arr => arr.toSeq }.toSet -- Set(Seq.empty[String], Seq("p1"), Seq("p2"), Seq("p3"), Seq("p1", "p2"), Seq("p1", "p3"), Seq("p2", "p3"), Seq("p1", "p2", "p3")) must beEmpty - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala new file mode 100644 index 0000000..64da217 --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/CounterBulkLoader.scala @@ -0,0 +1,78 @@ +package org.apache.s2graph.counter.loader + +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.core.BlobExactKey +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.loader.core.{CounterFunctions, CounterEtlFunctions} +import org.apache.s2graph.counter.models.Counter.ItemType +import org.apache.s2graph.counter.models.{DBModel, CounterModel} +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.{HashMapParam, SparkApp, WithKafka} +import org.apache.spark.SparkContext + +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.concurrent.ExecutionContext + +object CounterBulkLoader extends SparkApp with WithKafka { + lazy val config = S2ConfigFactory.config + lazy val s2Config = new S2CounterConfig(config) + lazy val counterModel = new CounterModel(config) + lazy val className = getClass.getName.stripSuffix("$") + lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) + + implicit val ec = ExecutionContext.Implicits.global + + val initialize = { + println("initialize") + // Graph(config) + DBModel.initialize(config) + true + } + + override def run(): Unit = { + val hdfsPath = args(0) + val blockSize = args(1).toInt + val minPartitions = args(2).toInt + val conf = sparkConf(s"$hdfsPath: CounterBulkLoader") + + val sc = new SparkContext(conf) + val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + val msgs = sc.textFile(hdfsPath) + + val etlRdd = msgs.repartition(minPartitions).mapPartitions { part => + // parse and etl + assert(initialize) + val items = { + for { + msg <- part + line <- GraphUtil.parseString(msg) + sp = GraphUtil.split(line) if sp.size <= 7 || GraphUtil.split(line)(7) != "in" + item <- CounterEtlFunctions.parseEdgeFormat(line) + } yield { + acc +=("Edges", 1) + item + } + } + items.grouped(blockSize).flatMap { grouped => + grouped.groupBy(e => (e.service, e.action)).flatMap { case ((service, action), v) => + CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v.toList) + } + } + } + + val exactRdd = CounterFunctions.exactCountFromEtl(etlRdd, etlRdd.partitions.length) + val logRdd = exactRdd.mapPartitions { part => + val seq = part.toSeq + CounterFunctions.insertBlobValue(seq.map(_._1).filter(_.itemType == ItemType.BLOB).map(_.asInstanceOf[BlobExactKey]), acc) + // update exact counter + CounterFunctions.updateExactCounter(seq, acc).toIterator + } + + val rankRdd = CounterFunctions.makeRankingRddFromTrxLog(logRdd, logRdd.partitions.length) + rankRdd.foreachPartition { part => + CounterFunctions.updateRankingCounter(part, acc) + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala new file mode 100644 index 0000000..8bdc5ba --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/EraseDailyCounter.scala @@ -0,0 +1,133 @@ +package org.apache.s2graph.counter.loader + +import java.text.SimpleDateFormat + +import kafka.producer.KeyedMessage +import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap +import org.apache.s2graph.counter.core.v1.ExactStorageHBase +import org.apache.s2graph.counter.core.v2.ExactStorageGraph +import org.apache.s2graph.counter.core._ +import org.apache.s2graph.counter._ +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.loader.core.CounterEtlItem +import org.apache.s2graph.counter.models.{CounterModel, DBModel, Counter} +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.{SparkApp, WithKafka} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import play.api.libs.json.Json +import scala.collection.mutable +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.concurrent.ExecutionContext + +object EraseDailyCounter extends SparkApp with WithKafka { + implicit val ec = ExecutionContext.Implicits.global + + + lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) + def valueToEtlItem(policy: Counter, key: ExactKeyTrait, values: ExactValueMap): Seq[CounterEtlItem] = { + if (values.nonEmpty) { + for { + (eq, value) <- filter(values.toList) + } yield { + CounterEtlItem(eq.tq.ts, policy.service, policy.action, key.itemKey, Json.toJson(eq.dimKeyValues), Json.toJson(Map("value" -> -value))) + } + } else { + Nil + } + } + + def filter(values: List[(ExactQualifier, Long)]): List[(ExactQualifier, Long)] = { + val sorted = values.sortBy(_._1.dimKeyValues.size).reverse + val (eq, value) = sorted.head + val dimKeys = eq.dimKeyValues.toSeq + val flat = { + for { + i <- 0 to dimKeys.length + comb <- dimKeys.combinations(i) + } yield { + ExactQualifier(eq.tq, comb.toMap) -> value + } + }.toMap + + // println("flat >>>", flat) + + val valuesMap = values.toMap + val remain = (valuesMap ++ flat.map { case (k, v) => + k -> (valuesMap(k) - v) + }).filter(_._2 > 0).toList + + // println("remain >>>", remain) + + if (remain.isEmpty) { + List((eq, value)) + } else { + (eq, value) :: filter(remain) + } + } + + def produce(policy: Counter, exactRdd: RDD[(ExactKeyTrait, ExactValueMap)]): Unit = { + exactRdd.mapPartitions { part => + for { + (key, values) <- part + item <- valueToEtlItem(policy, key, values) + } yield { + item + } + }.foreachPartition { part => + val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]] + part.foreach { item => + val k = getPartKey(item.item, 20) + val values = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem]) + values += item + m.update(k, values) + } + m.foreach { case (k, v) => + v.map(_.toKafkaMessage).grouped(1000).foreach { grouped => + // println(grouped) + producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n"))) + } + } + } + } + + def rddToExactRdd(policy: Counter, date: String, rdd: RDD[String]): RDD[(ExactKeyTrait, ExactValueMap)] = { + val dateFormat = new SimpleDateFormat("yyyy-MM-dd") + val fromTs = dateFormat.parse(date).getTime + val toTs = fromTs + 23 * 60 * 60 * 1000 + + rdd.mapPartitions { part => + val exactCounter = policy.version match { + case VERSION_1 => new ExactCounter(S2ConfigFactory.config, new ExactStorageHBase(S2ConfigFactory.config)) + case VERSION_2 => new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) + } + + for { + line <- part + FetchedCounts(exactKey, qualifierWithCountMap) <- exactCounter.getCount(policy, line, Array(TimedQualifier.IntervalUnit.DAILY), fromTs, toTs) + } yield { + (exactKey, qualifierWithCountMap) + } + } + } + + lazy val className = getClass.getName.stripSuffix("$") + + override def run(): Unit = { + validateArgument("service", "action", "date", "file", "op") + DBModel.initialize(S2ConfigFactory.config) + + val (service, action, date, file, op) = (args(0), args(1), args(2), args(3), args(4)) + val conf = sparkConf(s"$className: $service.$action") + + val ctx = new SparkContext(conf) + + val rdd = ctx.textFile(file, 20) + + val counterModel = new CounterModel(S2ConfigFactory.config) + + val policy = counterModel.findByServiceAction(service, action).get + val exactRdd = rddToExactRdd(policy, date, rdd) + produce(policy, exactRdd) + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala new file mode 100644 index 0000000..8c9a8dd --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/config/StreamingConfig.scala @@ -0,0 +1,24 @@ +package org.apache.s2graph.counter.loader.config + +import org.apache.s2graph.counter.config.ConfigFunctions +import org.apache.s2graph.spark.config.S2ConfigFactory + +object StreamingConfig extends ConfigFunctions(S2ConfigFactory.config) { + // kafka + val KAFKA_ZOOKEEPER = getOrElse("kafka.zookeeper", "localhost") + val KAFKA_BROKERS = getOrElse("kafka.brokers", "localhost") + val KAFKA_TOPIC_GRAPH = getOrElse("kafka.topic.graph", "s2graphInalpha") + val KAFKA_TOPIC_ETL = getOrElse("kafka.topic.etl", "s2counter-etl-alpha") + val KAFKA_TOPIC_COUNTER = getOrElse("kafka.topic.counter", "s2counter-alpha") + val KAFKA_TOPIC_COUNTER_TRX = getOrElse("kafka.topic.counter-trx", "s2counter-trx-alpha") + val KAFKA_TOPIC_COUNTER_FAIL = getOrElse("kafka.topic.counter-fail", "s2counter-fail-alpha") + + // profile cache + val PROFILE_CACHE_TTL_SECONDS = getOrElse("profile.cache.ttl.seconds", 60 * 60 * 24) // default 1 day + val PROFILE_CACHE_MAX_SIZE = getOrElse("profile.cache.max.size", 10000) + val PROFILE_PREFETCH_SIZE = getOrElse("profile.prefetch.size", 10) + + // graph url + val GRAPH_URL = getOrElse("s2graph.url", "") + val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala new file mode 100644 index 0000000..3e80e25 --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala @@ -0,0 +1,86 @@ +package org.apache.s2graph.counter.loader.core + +import org.apache.s2graph.core.{Edge, Graph, GraphUtil} +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.models.CounterModel +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.spark.Logging +import play.api.libs.json._ +import scala.collection.mutable.{HashMap => MutableHashMap} + +object CounterEtlFunctions extends Logging { + lazy val filterOps = Seq("insert", "insertBulk", "update", "increment").map(op => GraphUtil.operations(op)) + lazy val preFetchSize = StreamingConfig.PROFILE_PREFETCH_SIZE + lazy val config = S2ConfigFactory.config + lazy val counterModel = new CounterModel(config) + + def logToEdge(line: String): Option[Edge] = { + for { + elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge] + edge <- Some(elem.asInstanceOf[Edge]).filter { x => + filterOps.contains(x.op) + } + } yield { + edge + } + } + + def parseEdgeFormat(line: String): Option[CounterEtlItem] = { + /** + * 1427082276804 insert edge 19073318 52453027_93524145648511699 story_user_ch_doc_view {"doc_type" : "l", "channel_subscribing" : "y", "view_from" : "feed"} + */ + for { + elem <- Graph.toGraphElement(line) if elem.isInstanceOf[Edge] + edge <- Some(elem.asInstanceOf[Edge]).filter { x => + filterOps.contains(x.op) + } + } yield { + val label = edge.label + val labelName = label.label + val tgtService = label.tgtColumn.service.serviceName + val tgtId = edge.tgtVertex.innerId.toString() + val srcId = edge.srcVertex.innerId.toString() + + // make empty property if no exist edge property + val dimension = Json.parse(Some(GraphUtil.split(line)).filter(_.length >= 7).map(_(6)).getOrElse("{}")) + val bucketKeys = Seq("_from") + val bucketKeyValues = { + for { + variable <- bucketKeys + } yield { + val jsValue = variable match { + case "_from" => JsString(srcId) + case s => dimension \ s + } + s"[[$variable]]" -> jsValue + } + } + val property = Json.toJson(bucketKeyValues :+ ("value" -> JsString("1")) toMap) +// val property = Json.toJson(Map("_from" -> srcId, "_to" -> tgtId, "value" -> "1")) + + CounterEtlItem(edge.ts, tgtService, labelName, tgtId, dimension, property) + } + } + + def parseEdgeFormat(lines: List[String]): List[CounterEtlItem] = { + for { + line <- lines + item <- parseEdgeFormat(line) + } yield { + item + } + } + + def checkPolicyAndMergeDimension(service: String, action: String, items: List[CounterEtlItem]): List[CounterEtlItem] = { + counterModel.findByServiceAction(service, action).map { policy => + if (policy.useProfile) { + policy.bucketImpId match { + case Some(_) => DimensionProps.mergeDimension(policy, items) + case None => Nil + } + } else { + items + } + }.getOrElse(Nil) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala new file mode 100644 index 0000000..7a1ebb7 --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlItem.scala @@ -0,0 +1,39 @@ +package org.apache.s2graph.counter.loader.core + +import org.apache.s2graph.counter.util.UnitConverter +import org.slf4j.LoggerFactory +import play.api.libs.json._ +import scala.util.{Failure, Success, Try} + +case class CounterEtlItem(ts: Long, service: String, action: String, item: String, dimension: JsValue, property: JsValue, useProfile: Boolean = false) { + def toKafkaMessage: String = { + s"$ts\t$service\t$action\t$item\t${dimension.toString()}\t${property.toString()}" + } + + lazy val value = { + property \ "value" match { + case JsNumber(n) => n.longValue() + case JsString(s) => s.toLong + case _: JsUndefined => 1L + case _ => throw new Exception("wrong type") + } + } + } + +object CounterEtlItem { + val log = LoggerFactory.getLogger(this.getClass) + + def apply(line: String): Option[CounterEtlItem] = { + Try { + val Array(ts, service, action, item, dimension, property) = line.split('\t') + CounterEtlItem(UnitConverter.toMillis(ts.toLong), service, action, item, Json.parse(dimension), Json.parse(property)) + } match { + case Success(item) => + Some(item) + case Failure(ex) => + log.error(">>> failed") + log.error(s"${ex.toString}: $line") + None + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala new file mode 100644 index 0000000..59ac841 --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterFunctions.scala @@ -0,0 +1,446 @@ +package org.apache.s2graph.counter.loader.core + +import kafka.producer.KeyedMessage +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.counter.TrxLog +import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap +import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap +import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit +import org.apache.s2graph.counter.core._ +import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, ExactStorageGraph} +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.loader.models.DefaultCounterModel +import org.apache.s2graph.counter.models.{Counter, DBModel} +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.WithKafka +import org.apache.spark.rdd.RDD +import org.apache.spark.{Accumulable, Logging} +import play.api.libs.json.{JsNumber, JsString, JsValue, Json} +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.concurrent.ExecutionContext +import scala.language.postfixOps +import scala.util.Try + +object CounterFunctions extends Logging with WithKafka { + + private val K_MAX = 500 + implicit val ec = ExecutionContext.Implicits.global + + val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) + val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config)) + + lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) + + type HashMapAccumulable = Accumulable[MutableHashMap[String, Long], (String, Long)] + + val initialize = { + logInfo("initialize CounterFunctions") + DBModel.initialize(S2ConfigFactory.config) + true + } + + def getCountValue(policy: Counter, item: CounterEtlItem): ExactValueMap = { + for { + dimKeys <- policy.dimensionList + dimValues <- getDimensionValues(item.dimension, dimKeys).toSeq + eq <- ExactQualifier.getQualifiers(policy.intervals.map(IntervalUnit.withName), item.ts, dimKeys.zip(dimValues).toMap) + } yield { + eq -> item.value + } + }.toMap + + def getDimensionValues(dimension: JsValue, keys: Array[String]): Option[Array[String]] = { + Try { + for { + k <- keys + jsValue = dimension \ k + } yield { + jsValue match { + case JsNumber(n) => n.toString() + case JsString(s) => s + case _ => throw new Exception() + } + } + }.toOption + } + + def exactMapper(item: CounterEtlItem): Option[(ExactKeyTrait, ExactValueMap)] = { + DefaultCounterModel.findByServiceAction(item.service, item.action).map { policy => + (ExactKey(policy, item.item, checkItemType = true), getCountValue(policy, item)) + } + } + + def rankingMapper(row: ItemRankingRow): Seq[(RankingKey, RankingValueMap)] = { + // normal ranking + for { + (eq, rv) <- row.value + } yield { + (RankingKey(row.key.policyId, row.key.version, eq), Map(row.key.itemKey -> rv)) + } + }.toSeq + + def logToRankValue(log: TrxLog): Option[(ExactKeyTrait, Map[ExactQualifier, RankingValue])] = { + DefaultCounterModel.findById(log.policyId).map { policy => + val key = ExactKey(policy, log.item, checkItemType = false) + val value = { + for { + result <- log.results + } yield { + ExactQualifier(TimedQualifier(result.interval, result.ts), result.dimension) -> RankingValue(result.result, result.value) + } + }.toMap + key -> value + } + } + + def reduceValue[T, U](op: (U, U) => U, default: U)(m1: Map[T, U], m2: Map[T, U]): Map[T, U] = { + m1 ++ m2.map { case (k, v) => + k -> op(m1.getOrElse(k, default), v) + } + } + + def makeExactRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[(ExactKeyTrait, ExactValueMap)] = { + rdd.mapPartitions { part => + assert(initialize) + for { + (k, v) <- part + line <- GraphUtil.parseString(v) + item <- CounterEtlItem(line).toSeq + ev <- exactMapper(item).toSeq + } yield { + ev + } + }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), numPartitions) + } + + def makeRankingRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { + val logRdd = makeTrxLogRdd(rdd, numPartitions) + makeRankingRddFromTrxLog(logRdd, numPartitions) + } + + def makeRankingRddFromTrxLog(rdd: RDD[TrxLog], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { + val itemRankingRdd = makeItemRankingRdd(rdd, numPartitions).cache() + try { + rankingCount(itemRankingRdd, numPartitions) union + rateRankingCount(itemRankingRdd, numPartitions) union + trendRankingCount(itemRankingRdd, numPartitions) coalesce numPartitions + } finally { + itemRankingRdd.unpersist(false) + } + } + + def makeTrxLogRdd(rdd: RDD[(String, String)], numPartitions: Int): RDD[TrxLog] = { + rdd.mapPartitions { part => + assert(initialize) + for { + (k, v) <- part + line <- GraphUtil.parseString(v) + trxLog = Json.parse(line).as[TrxLog] if trxLog.success + } yield { + trxLog + } + } + } + + def rankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { + rdd.mapPartitions { part => + for { + row <- part + rv <- rankingMapper(row) + } yield { + rv + } + }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions) + } + + case class ItemRankingRow(key: ExactKeyTrait, value: Map[ExactQualifier, RankingValue]) + + def makeItemRankingRdd(rdd: RDD[TrxLog], numPartitions: Int): RDD[ItemRankingRow] = { + rdd.mapPartitions { part => + for { + log <- part + rv <- logToRankValue(log) + } yield { + rv + } + }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions).mapPartitions { part => + for { + (key, value) <- part + } yield { + ItemRankingRow(key, value) + } + } + } + + def mapTrendRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, Map[ExactQualifier, RateRankingValue])] = { + for { + row <- rows + trendPolicy <- DefaultCounterModel.findByTrendActionId(row.key.policyId) + } yield { + val key = ExactKey(trendPolicy, row.key.itemKey, checkItemType = false) + val value = row.value.filter { case (eq, rv) => + // eq filter by rate policy dimension + trendPolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys } + }.map { case (eq, rv) => + eq -> RateRankingValue(rv.score, -1) + } + (key, value) + } + } + + def mapRateRankingValue(rows: Seq[ItemRankingRow]): Seq[(ExactKeyTrait, Map[ExactQualifier, RateRankingValue])] = { + val actionPart = { + for { + row <- rows + ratePolicy <- DefaultCounterModel.findByRateActionId(row.key.policyId) + } yield { + val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false) + val value = row.value.filter { case (eq, rv) => + // eq filter by rate policy dimension + ratePolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys } + }.map { case (eq, rv) => + eq -> RateRankingValue(rv.score, -1) + } + (key, value) + } + } + + val basePart = { + for { + row <- rows + ratePolicy <- DefaultCounterModel.findByRateBaseId(row.key.policyId) + } yield { + val key = ExactKey(ratePolicy, row.key.itemKey, checkItemType = false) + val value = row.value.filter { case (eq, rv) => + // eq filter by rate policy dimension + ratePolicy.dimensionSet.exists { dimSet => dimSet == eq.dimKeyValues.keys } + }.map { case (eq, rv) => + eq -> RateRankingValue(-1, rv.score) + } + (key, value) + } + } + + actionPart ++ basePart + } + + def trendRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { + rdd.mapPartitions { part => + mapTrendRankingValue(part.toSeq) toIterator + }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(_, _), numPartitions).mapPartitions { part => + val missingByPolicy = { + for { + (key, value) <- part.toSeq + trendPolicy <- DefaultCounterModel.findById(key.policyId).toSeq + actionId <- trendPolicy.rateActionId.toSeq + actionPolicy <- DefaultCounterModel.findById(actionId).toSeq + } yield { + // filter total eq + val missingQualifiersWithRRV = value.filterKeys { eq => eq.tq.q != IntervalUnit.TOTAL } + (actionPolicy, key, missingQualifiersWithRRV) + } + }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3))) + + val filled = { + for { + (policy, missing) <- missingByPolicy.toSeq + keyWithPast = exactCounter.getPastCounts(policy, missing.map { case (k, v) => k.itemKey -> v.keys.toSeq }) + (key, current) <- missing + } yield { + val past = keyWithPast.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long]) + val base = past.mapValues(l => RateRankingValue(-1, l)) +// log.warn(s"trend: $policy $key -> $current $base") + key -> reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, base) + } + } + +// filled.foreach(println) + + { + // filter by rate threshold + for { + (key, value) <- filled + ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq + (eq, rrv) <- value if rrv.baseScore >= ratePolicy.rateThreshold.getOrElse(Int.MinValue) + } yield { + (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> rrv.rankingValue)) + } + } toIterator + }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions) + } + + def rateRankingCount(rdd: RDD[ItemRankingRow], numPartitions: Int): RDD[(RankingKey, RankingValueMap)] = { + rdd.mapPartitions { part => + mapRateRankingValue(part.toSeq) toIterator + }.reduceByKey(reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(_, _), numPartitions).mapPartitions { part => + val seq = part.toSeq +// seq.foreach(x => println(s"item ranking row>> $x")) + + // find and evaluate action value is -1 + val actionMissingByPolicy = { + for { + (key, value) <- seq if value.exists { case (eq, rrv) => rrv.actionScore == -1 } + ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq + actionId <- ratePolicy.rateActionId.toSeq + actionPolicy <- DefaultCounterModel.findById(actionId) + } yield { + (actionPolicy, key, value.filter { case (eq, rrv) => rrv.actionScore == -1 }) + } + }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3))) + + val actionFilled = { + for { + (actionPolicy, actionMissing) <- actionMissingByPolicy.toSeq + keyWithRelated = exactCounter.getRelatedCounts(actionPolicy, actionMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq }) + (key, current) <- actionMissing + } yield { + val related = keyWithRelated.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long]) + val found = related.mapValues(l => RateRankingValue(l, -1)) + val filled = reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, found) +// log.warn(s"action: $key -> $found $filled") + key -> filled + } + } + +// actionFilled.foreach(x => println(s"action filled>> $x")) + + // find and evaluate base value is -1 + val baseMissingByPolicy = { + for { + (key, value) <- seq if value.exists { case (eq, rrv) => rrv.baseScore == -1 } + ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq + baseId <- ratePolicy.rateBaseId.toSeq + basePolicy <- DefaultCounterModel.findById(baseId) + } yield { + (basePolicy, key, value.filter { case (eq, rrv) => rrv.baseScore == -1 }) + } + }.groupBy(_._1).mapValues(seq => seq.map(x => (x._2, x._3))) + + val baseFilled = { + for { + (basePolicy, baseMissing) <- baseMissingByPolicy.toSeq + keyWithRelated = exactCounter.getRelatedCounts(basePolicy, baseMissing.map { case (k, v) => k.itemKey -> v.keys.toSeq }) + (key, current) <- baseMissing + } yield { + val related = keyWithRelated.getOrElse(key.itemKey, Map.empty[ExactQualifier, Long]) + val found = related.mapValues(l => RateRankingValue(-1, l)) + val filled = reduceValue(RateRankingValue.reduce, RateRankingValue(-1, -1))(current, found) +// log.warn(s"base: $basePolicy $key -> $found $filled") + key -> filled + } + } + +// baseFilled.foreach(x => println(s"base filled>> $x")) + + val alreadyFilled = { + for { + (key, value) <- seq if value.exists { case (eq, rrv) => rrv.actionScore != -1 && rrv.baseScore != -1 } + } yield { + key -> value.filter { case (eq, rrv) => rrv.actionScore != -1 && rrv.baseScore != -1 } + } + } + + val rtn = { + // filter by rate threshold + for { + (key, value) <- actionFilled ++ baseFilled ++ alreadyFilled + ratePolicy <- DefaultCounterModel.findById(key.policyId).toSeq + (eq, rrv) <- value if rrv.baseScore >= ratePolicy.rateThreshold.getOrElse(Int.MinValue) + } yield { + (RankingKey(key.policyId, key.version, eq), Map(key.itemKey -> rrv.rankingValue)) + } + } + rtn.toIterator + }.reduceByKey(reduceValue(RankingValue.reduce, RankingValue(0, 0))(_, _), numPartitions) + } + + def insertBlobValue(keys: Seq[BlobExactKey], acc: HashMapAccumulable): Unit = { + val keyByPolicy = { + for { + key <- keys + policy <- DefaultCounterModel.findById(key.policyId) + } yield { + (policy, key) + } + }.groupBy(_._1).mapValues(values => values.map(_._2)) + + for { + (policy, allKeys) <- keyByPolicy + keys <- allKeys.grouped(10) + success <- exactCounter.insertBlobValue(policy, keys) + } yield { + success match { + case true => acc += ("BLOB", 1) + case false => acc += ("BLOBFailed", 1) + } + } + } + + def updateExactCounter(counts: Seq[(ExactKeyTrait, ExactValueMap)], acc: HashMapAccumulable): Seq[TrxLog] = { + val countsByPolicy = { + for { + (key, count) <- counts + policy <- DefaultCounterModel.findById(key.policyId) + } yield { + (policy, (key, count)) + } + }.groupBy { case (policy, _) => policy }.mapValues(values => values.map(_._2)) + + for { + (policy, allCounts) <- countsByPolicy + counts <- allCounts.grouped(10) + trxLog <- exactCounter.updateCount(policy, counts) + } yield { + trxLog.success match { + case true => acc += (s"ExactV${policy.version}", 1) + case false => acc += (s"ExactFailedV${policy.version}", 1) + } + trxLog + } + }.toSeq + + def exactCountFromEtl(rdd: RDD[CounterEtlItem], numPartitions: Int): RDD[(ExactKeyTrait, ExactValueMap)] = { + rdd.mapPartitions { part => + for { + item <- part + ev <- exactMapper(item).toSeq + } yield { + ev + } + }.reduceByKey(reduceValue[ExactQualifier, Long](_ + _, 0L)(_, _), numPartitions) + } + + def updateRankingCounter(values: TraversableOnce[(RankingKey, RankingValueMap)], acc: HashMapAccumulable): Unit = { + assert(initialize) + val valuesByPolicy = { + for { + (key, value) <- values.toSeq + policy <- DefaultCounterModel.findById(key.policyId) + if policy.useRank && rankingCounter.ready(policy) // update only rank counter enabled and ready + } yield { + (policy, (key, value)) + } + }.groupBy { case (policy, _) => policy }.mapValues(values => values.map(_._2)) + + for { + (policy, allValues) <- valuesByPolicy + groupedValues <- allValues.grouped(10) + } { + rankingCounter.update(groupedValues, K_MAX) + acc += (s"RankingV${policy.version}", groupedValues.length) + } + } + + def produceTrxLog(trxLogs: TraversableOnce[TrxLog]): Unit = { + for { + trxLog <- trxLogs + } { + val topic = trxLog.success match { + case true => StreamingConfig.KAFKA_TOPIC_COUNTER_TRX + case false => StreamingConfig.KAFKA_TOPIC_COUNTER_FAIL + } + val msg = new KeyedMessage[String, String](topic, s"${trxLog.policyId}${trxLog.item}", Json.toJson(trxLog).toString()) + producer.send(msg) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala new file mode 100644 index 0000000..b1ebe50 --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala @@ -0,0 +1,150 @@ +package org.apache.s2graph.counter.loader.core + +import org.apache.commons.httpclient.HttpStatus +import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service} +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.models.Counter +import org.apache.s2graph.counter.util.{RetryAsync, CollectionCache, CollectionCacheConfig} +import org.slf4j.LoggerFactory +import play.api.libs.json._ +import scala.annotation.tailrec +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.Try + +object DimensionProps { + // using play-ws without play app + private val builder = new com.ning.http.client.AsyncHttpClientConfig.Builder() + private val client = new play.api.libs.ws.ning.NingWSClient(builder.build) + private val log = LoggerFactory.getLogger(this.getClass) + + private val retryCnt = 3 + + val cacheConfig = CollectionCacheConfig(StreamingConfig.PROFILE_CACHE_MAX_SIZE, + StreamingConfig.PROFILE_CACHE_TTL_SECONDS, + negativeCache = true, + 3600 // negative ttl 1 hour + ) + val cache: CollectionCache[Option[JsValue]] = new CollectionCache[Option[JsValue]](cacheConfig) + + @tailrec + private[counter] def makeRequestBody(requestBody: String, keyValues: List[(String, String)]): String = { + keyValues match { + case head :: tail => + makeRequestBody(requestBody.replace(head._1, head._2), tail) + case Nil => requestBody + } + } + + private[counter] def query(bucket: Bucket, item: CounterEtlItem): Future[Option[JsValue]] = { + val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] fields) + .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") } + .map { case (key, jsValue) => + val replacement = jsValue match { + case JsString(s) => s + case value => value.toString() + } + key -> replacement + }.toList + + val cacheKey = s"${bucket.impressionId}=" + keyValues.flatMap(x => Seq(x._1, x._2)).mkString("_") + + cache.withCacheAsync(cacheKey) { + val retryFuture = RetryAsync(retryCnt, withSleep = false) { + val future = bucket.httpVerb.toUpperCase match { + case "GET" => + client.url(bucket.apiPath).get() + case "POST" => + val newBody = makeRequestBody(bucket.requestBody, keyValues) + client.url(bucket.apiPath).post(Json.parse(newBody)) + } + + future.map { resp => + resp.status match { + case HttpStatus.SC_OK => + val json = Json.parse(resp.body) + for { + results <- (json \ "results").asOpt[Seq[JsValue]] + result <- results.headOption + props <- (result \ "props").asOpt[JsValue] + } yield { + props + } + case _ => + log.error(s"${resp.body}(${resp.status}}) item: $item") + None + } + } + } + + // if fail to retry + retryFuture onFailure { case t => log.error(s"${t.getMessage} item: $item") } + + retryFuture + } + } + + private[counter] def query(service: Service, experiment: Experiment, item: CounterEtlItem): Future[Option[JsValue]] = { + val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] fields) + .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") }.toMap + + val cacheKey = s"${experiment.name}=" + keyValues.flatMap(x => Seq(x._1, x._2)).mkString("_") + + cache.withCacheAsync(cacheKey) { + val retryFuture = RetryAsync(retryCnt, withSleep = false) { + val url = s"${StreamingConfig.GRAPH_URL}/graphs/experiment/${service.accessToken}/${experiment.name}/0" + val future = client.url(url).post(Json.toJson(keyValues)) + + future.map { resp => + resp.status match { + case HttpStatus.SC_OK => + val json = Json.parse(resp.body) + for { + results <- (json \ "results").asOpt[Seq[JsValue]] + result <- results.headOption + props <- (result \ "props").asOpt[JsValue] + } yield { + props + } + case _ => + log.error(s"${resp.body}(${resp.status}}) item: $item") + None + } + } + } + + // if fail to retry + retryFuture onFailure { case t => log.error(s"${t.getMessage} item: $item") } + + retryFuture + } + } + + def mergeDimension(policy: Counter, items: List[CounterEtlItem]): List[CounterEtlItem] = { + for { + impId <- policy.bucketImpId + bucket <- Bucket.findByImpressionId(impId) + experiment <- Experiment.findById(bucket.experimentId) + service <- Try { Service.findById(experiment.serviceId) }.toOption + } yield { + val futures = { + for { + item <- items + } yield { + query(service, experiment, item).map { + case Some(jsValue) => + val newDimension = item.dimension.as[JsObject] ++ jsValue.as[JsObject] + item.copy(dimension = newDimension) + case None => item + } + } + } + Await.result(Future.sequence(futures), 10 seconds) + } + }.getOrElse(items) + + def getCacheStatsString: String = { + cache.getStatsString + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala new file mode 100644 index 0000000..5908e1c --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/models/DefaultCounterModel.scala @@ -0,0 +1,6 @@ +package org.apache.s2graph.counter.loader.models + +import org.apache.s2graph.counter.models.CounterModel +import org.apache.s2graph.spark.config.S2ConfigFactory + +case object DefaultCounterModel extends CounterModel(S2ConfigFactory.config) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala new file mode 100644 index 0000000..12f5f73 --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/EtlStreaming.scala @@ -0,0 +1,114 @@ +package org.apache.s2graph.counter.loader.stream + +import kafka.producer.KeyedMessage +import kafka.serializer.StringDecoder +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.loader.core.{DimensionProps, CounterEtlItem, CounterEtlFunctions} +import org.apache.s2graph.counter.models.{DBModel, CounterModel} +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam} +import org.apache.spark.streaming.Durations._ +import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions +import org.apache.spark.streaming.kafka.StreamHelper +import scala.collection.mutable +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.concurrent.ExecutionContext + +object EtlStreaming extends SparkApp with WithKafka { + lazy val config = S2ConfigFactory.config + lazy val s2Config = new S2CounterConfig(config) + lazy val counterModel = new CounterModel(config) + lazy val className = getClass.getName.stripSuffix("$") + lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) + + implicit val graphEx = ExecutionContext.Implicits.global + + val initialize = { + println("streaming initialize") +// Graph(config) + DBModel.initialize(config) + true + } + + val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_ETL) + val strInputTopics = inputTopics.mkString(",") + val groupId = buildKafkaGroupId(strInputTopics, "etl_to_counter") + val kafkaParam = Map( + "group.id" -> groupId, + "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, + "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, + "zookeeper.connection.timeout.ms" -> "10000" + ) + val streamHelper = StreamHelper(kafkaParam) + + override def run(): Unit = { + validateArgument("interval") + val (intervalInSec) = seconds(args(0).toLong) + + val conf = sparkConf(s"$strInputTopics: $className") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + /** + * read message from etl topic and join user profile from graph and then produce whole message to counter topic + */ + val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics) + + // etl logic + stream.foreachRDD { (rdd, ts) => + rdd.foreachPartitionWithOffsetRange { case (osr, part) => + assert(initialize) + + // convert to edge format + val items = { + for { + (k, v) <- part + line <- GraphUtil.parseString(v) + item <- CounterEtlFunctions.parseEdgeFormat(line) + } yield { + acc += ("Edges", 1) + item + } + } + + // join user profile + val joinItems = items.toList.groupBy { e => + (e.service, e.action) + }.flatMap { case ((service, action), v) => + CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v) + } + + // group by kafka partition key and send to kafka + val m = MutableHashMap.empty[Int, mutable.MutableList[CounterEtlItem]] + joinItems.foreach { item => + if (item.useProfile) { + acc += ("ETL", 1) + } + val k = getPartKey(item.item, 20) + val values: mutable.MutableList[CounterEtlItem] = m.getOrElse(k, mutable.MutableList.empty[CounterEtlItem]) + values += item + m.update(k, values) + } + m.foreach { case (k, v) => + v.map(_.toKafkaMessage).grouped(1000).foreach { grouped => + acc += ("Produce", grouped.size) + producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_COUNTER, null, k, grouped.mkString("\n"))) + } + } + + streamHelper.commitConsumerOffset(osr) + } + + if (ts.milliseconds / 1000 % 60 == 0) { + log.warn(DimensionProps.getCacheStatsString) + } + } + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala new file mode 100644 index 0000000..3eea406 --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/ExactCounterStreaming.scala @@ -0,0 +1,69 @@ +package org.apache.s2graph.counter.loader.stream + +import kafka.serializer.StringDecoder +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.loader.core.CounterFunctions +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam} +import org.apache.spark.streaming.Durations._ +import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions +import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper} +import scala.collection.mutable.{HashMap => MutableHashMap} +import scala.language.postfixOps + +object ExactCounterStreaming extends SparkApp with WithKafka { + lazy val config = S2ConfigFactory.config + lazy val s2Config = new S2CounterConfig(config) + lazy val className = getClass.getName.stripSuffix("$") + + lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) + + val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER) + val strInputTopics = inputTopics.mkString(",") + val groupId = buildKafkaGroupId(strInputTopics, "counter_v2") + val kafkaParam = Map( +// "auto.offset.reset" -> "smallest", + "group.id" -> groupId, + "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, + "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, + "zookeeper.connection.timeout.ms" -> "10000" + ) + val streamHelper = StreamHelper(kafkaParam) + + override def run() = { + validateArgument("interval", "clear") + val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean) + + if (clear) { + streamHelper.kafkaHelper.consumerGroupCleanup() + } + + val conf = sparkConf(s"$strInputTopics: $className") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + implicit val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + // make stream + val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics) + stream.foreachRDD { (rdd, ts) => + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + val exactRDD = CounterFunctions.makeExactRdd(rdd, offsets.length) + + // for at-least once semantic + exactRDD.foreachPartitionWithIndex { (i, part) => + // update exact counter + val trxLogs = CounterFunctions.updateExactCounter(part.toSeq, acc) + CounterFunctions.produceTrxLog(trxLogs) + + // commit offset range + streamHelper.commitConsumerOffset(offsets(i)) + } + } + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala new file mode 100644 index 0000000..9e6d6be --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/GraphToETLStreaming.scala @@ -0,0 +1,80 @@ +package org.apache.s2graph.counter.loader.stream + +import kafka.producer.KeyedMessage +import kafka.serializer.StringDecoder +import org.apache.s2graph.core.GraphUtil +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam} +import org.apache.spark.streaming.Durations._ +import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions +import scala.collection.mutable +import scala.collection.mutable.{HashMap => MutableHashMap} + +object GraphToETLStreaming extends SparkApp with WithKafka { + lazy val config = S2ConfigFactory.config + lazy val s2Config = new S2CounterConfig(config) + lazy val className = getClass.getName.stripSuffix("$") + lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) + + override def run(): Unit = { + validateArgument("interval", "topic") + val (intervalInSec, topic) = (seconds(args(0).toLong), args(1)) + + val groupId = buildKafkaGroupId(topic, "graph_to_etl") + val kafkaParam = Map( +// "auto.offset.reset" -> "smallest", + "group.id" -> groupId, + "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, + "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, + "zookeeper.connection.timeout.ms" -> "10000" + ) + + val conf = sparkConf(s"$topic: $className") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + /** + * consume graphIn topic and produce messages to etl topic + * two purpose + * 1. partition by target vertex id + * 2. expand kafka partition count + */ + val stream = getStreamHelper(kafkaParam).createStream[String, String, StringDecoder, StringDecoder](ssc, topic.split(',').toSet) + stream.foreachRDD { rdd => + rdd.foreachPartitionWithOffsetRange { case (osr, part) => + val m = MutableHashMap.empty[Int, mutable.MutableList[String]] + for { + (k, v) <- part + line <- GraphUtil.parseString(v) + } { + try { + val sp = GraphUtil.split(line) + // get partition key by target vertex id + val partKey = getPartKey(sp(4), 20) + val values = m.getOrElse(partKey, mutable.MutableList.empty[String]) + values += line + m.update(partKey, values) + } catch { + case ex: Throwable => + log.error(s"$ex: $line") + } + } + + m.foreach { case (k, v) => + v.grouped(1000).foreach { grouped => + producer.send(new KeyedMessage[String, String](StreamingConfig.KAFKA_TOPIC_ETL, null, k, grouped.mkString("\n"))) + } + } + + getStreamHelper(kafkaParam).commitConsumerOffset(osr) + } + } + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala new file mode 100644 index 0000000..2c2335b --- /dev/null +++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/stream/RankingCounterStreaming.scala @@ -0,0 +1,72 @@ +package org.apache.s2graph.counter.loader.stream + +import kafka.serializer.StringDecoder +import org.apache.s2graph.counter.config.S2CounterConfig +import org.apache.s2graph.counter.loader.config.StreamingConfig +import org.apache.s2graph.counter.loader.core.CounterFunctions +import org.apache.s2graph.spark.config.S2ConfigFactory +import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam} +import org.apache.spark.streaming.Durations._ +import org.apache.spark.streaming.kafka.KafkaRDDFunctions.rddToKafkaRDDFunctions +import org.apache.spark.streaming.kafka.{HasOffsetRanges, StreamHelper} +import scala.collection.mutable.{HashMap => MutableHashMap} + +object RankingCounterStreaming extends SparkApp with WithKafka { + lazy val config = S2ConfigFactory.config + lazy val s2Config = new S2CounterConfig(config) + lazy val className = getClass.getName.stripSuffix("$") + + lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) + + val inputTopics = Set(StreamingConfig.KAFKA_TOPIC_COUNTER_TRX) + val strInputTopics = inputTopics.mkString(",") + val groupId = buildKafkaGroupId(strInputTopics, "ranking_v2") + val kafkaParam = Map( +// "auto.offset.reset" -> "smallest", + "group.id" -> groupId, + "metadata.broker.list" -> StreamingConfig.KAFKA_BROKERS, + "zookeeper.connect" -> StreamingConfig.KAFKA_ZOOKEEPER, + "zookeeper.connection.timeout.ms" -> "10000" + ) + val streamHelper = StreamHelper(kafkaParam) + + override def run() = { + validateArgument("interval", "clear") + val (intervalInSec, clear) = (seconds(args(0).toLong), args(1).toBoolean) + + if (clear) { + streamHelper.kafkaHelper.consumerGroupCleanup() + } + + val conf = sparkConf(s"$strInputTopics: $className") + val ssc = streamingContext(conf, intervalInSec) + val sc = ssc.sparkContext + + implicit val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) + + // make stream + val stream = streamHelper.createStream[String, String, StringDecoder, StringDecoder](ssc, inputTopics) + stream.foreachRDD { (rdd, ts) => + // for at-least once semantic + val nextRdd = { + CounterFunctions.makeRankingRdd(rdd, sc.defaultParallelism).foreachPartition { part => + // update ranking counter + CounterFunctions.updateRankingCounter(part, acc) + } + rdd + } + + streamHelper.commitConsumerOffsets(nextRdd.asInstanceOf[HasOffsetRanges]) +// CounterFunctions.makeRankingRdd(rdd, offsets.length).foreachPartitionWithIndex { (i, part) => +// // update ranking counter +// CounterFunctions.updateRankingCounter(part, acc) +// +// // commit offset range +// streamHelper.commitConsumerOffset(offsets(i)) +// } + } + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala b/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala deleted file mode 100644 index ba5e863..0000000 --- a/s2counter_loader/src/main/scala/s2/config/StreamingConfig.scala +++ /dev/null @@ -1,24 +0,0 @@ -package s2.config - -/** - * Created by hsleep([email protected]) on 15. 4. 7.. - */ -object StreamingConfig extends ConfigFunctions(S2ConfigFactory.config) { - // kafka - val KAFKA_ZOOKEEPER = getOrElse("kafka.zookeeper", "localhost") - val KAFKA_BROKERS = getOrElse("kafka.brokers", "localhost") - val KAFKA_TOPIC_GRAPH = getOrElse("kafka.topic.graph", "s2graphInalpha") - val KAFKA_TOPIC_ETL = getOrElse("kafka.topic.etl", "s2counter-etl-alpha") - val KAFKA_TOPIC_COUNTER = getOrElse("kafka.topic.counter", "s2counter-alpha") - val KAFKA_TOPIC_COUNTER_TRX = getOrElse("kafka.topic.counter-trx", "s2counter-trx-alpha") - val KAFKA_TOPIC_COUNTER_FAIL = getOrElse("kafka.topic.counter-fail", "s2counter-fail-alpha") - - // profile cache - val PROFILE_CACHE_TTL_SECONDS = getOrElse("profile.cache.ttl.seconds", 60 * 60 * 24) // default 1 day - val PROFILE_CACHE_MAX_SIZE = getOrElse("profile.cache.max.size", 10000) - val PROFILE_PREFETCH_SIZE = getOrElse("profile.prefetch.size", 10) - - // graph url - val GRAPH_URL = getOrElse("s2graph.url", "") - val GRAPH_READONLY_URL = getOrElse("s2graph.read-only.url", GRAPH_URL) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala b/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala deleted file mode 100644 index 2843022..0000000 --- a/s2counter_loader/src/main/scala/s2/counter/CounterBulkLoader.scala +++ /dev/null @@ -1,78 +0,0 @@ -package s2.counter - -import com.kakao.s2graph.core.{Graph, GraphUtil} -import org.apache.spark.SparkContext -import s2.config.{S2ConfigFactory, S2CounterConfig, StreamingConfig} -import s2.counter.core.{BlobExactKey, CounterEtlFunctions, CounterFunctions} -import s2.models.Counter.ItemType -import s2.models.{CounterModel, DBModel} -import s2.spark.{HashMapParam, SparkApp, WithKafka} - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.concurrent.ExecutionContext - -/** - * Created by rain on 7/1/15. - */ -object CounterBulkLoader extends SparkApp with WithKafka { - lazy val config = S2ConfigFactory.config - lazy val s2Config = new S2CounterConfig(config) - lazy val counterModel = new CounterModel(config) - lazy val className = getClass.getName.stripSuffix("$") - lazy val producer = getProducer[String, String](StreamingConfig.KAFKA_BROKERS) - - implicit val graphEx = ExecutionContext.Implicits.global - - val initialize = { - println("initialize") -// Graph(config) - DBModel.initialize(config) - true - } - - override def run(): Unit = { - val hdfsPath = args(0) - val blockSize = args(1).toInt - val minPartitions = args(2).toInt - val conf = sparkConf(s"$hdfsPath: CounterBulkLoader") - - val sc = new SparkContext(conf) - val acc = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - val msgs = sc.textFile(hdfsPath) - - val etlRdd = msgs.repartition(minPartitions).mapPartitions { part => - // parse and etl - assert(initialize) - val items = { - for { - msg <- part - line <- GraphUtil.parseString(msg) - sp = GraphUtil.split(line) if sp.size <= 7 || GraphUtil.split(line)(7) != "in" - item <- CounterEtlFunctions.parseEdgeFormat(line) - } yield { - acc +=("Edges", 1) - item - } - } - items.grouped(blockSize).flatMap { grouped => - grouped.groupBy(e => (e.service, e.action)).flatMap { case ((service, action), v) => - CounterEtlFunctions.checkPolicyAndMergeDimension(service, action, v.toList) - } - } - } - - val exactRdd = CounterFunctions.exactCountFromEtl(etlRdd, etlRdd.partitions.length) - val logRdd = exactRdd.mapPartitions { part => - val seq = part.toSeq - CounterFunctions.insertBlobValue(seq.map(_._1).filter(_.itemType == ItemType.BLOB).map(_.asInstanceOf[BlobExactKey]), acc) - // update exact counter - CounterFunctions.updateExactCounter(seq, acc).toIterator - } - - val rankRdd = CounterFunctions.makeRankingRddFromTrxLog(logRdd, logRdd.partitions.length) - rankRdd.foreachPartition { part => - CounterFunctions.updateRankingCounter(part, acc) - } - } -} \ No newline at end of file
