http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala b/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala deleted file mode 100644 index 4970399..0000000 --- a/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala +++ /dev/null @@ -1,196 +0,0 @@ -package s2.counter.stream - -import com.kakao.s2graph.core.GraphUtil -import com.kakao.s2graph.core.mysqls.Label -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import play.api.libs.json.Json -import s2.config.{S2ConfigFactory, S2CounterConfig} -import s2.counter.core.CounterFunctions.HashMapAccumulable -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core._ -import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, RankingStorageGraph} -import s2.helper.CounterAdmin -import s2.models.{Counter, DBModel, DefaultCounterModel} -import s2.spark.HashMapParam - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.{Failure, Success} - -/** - * Created by hsleep([email protected]) on 2015. 11. 19.. - */ -class ExactCounterStreamingSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - private val master = "local[2]" - private val appName = "exact_counter_streaming" - private val batchDuration = Seconds(1) - - private var sc: SparkContext = _ - private var ssc: StreamingContext = _ - - val admin = new CounterAdmin(S2ConfigFactory.config) - val graphOp = new GraphOperation(S2ConfigFactory.config) - val s2config = new S2CounterConfig(S2ConfigFactory.config) - - val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) - val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config)) - - val service = "test" - val action = "test_case" - - override def beforeAll(): Unit = { - DBModel.initialize(S2ConfigFactory.config) - - val conf = new SparkConf() - .setMaster(master) - .setAppName(appName) - - ssc = new StreamingContext(conf, batchDuration) - - sc = ssc.sparkContext - - // create test_case label - com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") - if (Label.findByName(action, useCache = false).isEmpty) { - val strJs = - s""" - |{ - | "label": "$action", - | "srcServiceName": "$service", - | "srcColumnName": "src", - | "srcColumnType": "string", - | "tgtServiceName": "$service", - | "tgtColumnName": "$action", - | "tgtColumnType": "string", - | "indices": [ - | ], - | "props": [ - | ] - |} - """.stripMargin - graphOp.createLabel(Json.parse(strJs)) - } - - // action - admin.deleteCounter(service, action).foreach { - case Success(v) => - case Failure(ex) => - println(s"$ex") - } - admin.createCounter(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank = true)) - } - - override def afterAll(): Unit = { - admin.deleteCounter(service, action) - if (ssc != null) { - ssc.stop() - } - } - - "ExactCounter" should "update" in { - val policy = DefaultCounterModel.findByServiceAction(service, action).get - val data = - s""" - |1434534565675 $service $action 70362200_94013572857366866 {"is_shared":"false","relationship":"FE"} {"userId":"48255079","userIdType":"profile_id","value":"1"} - |1434534565675 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"48255079","userIdType":"profile_id","value":"1"} - |1434534566220 $service $action 51223360_94013140590929619 {"is_shared":"false","relationship":"FE"} {"userId":"312383","userIdType":"profile_id","value":"1"} - |1434534566508 $service $action 63808459_94013420047377826 {"is_shared":"false","relationship":"FE"} {"userId":"21968241","userIdType":"profile_id","value":"1"} - |1434534566210 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"6062217","userIdType":"profile_id","value":"1"} - |1434534566459 $service $action 49699692_94012186431261763 {"is_shared":"false","relationship":"FE"} {"userId":"67863471","userIdType":"profile_id","value":"1"} - |1434534565681 $service $action 64556827_94012311028641810 {"is_shared":"false","relationship":"FE"} {"userId":"19381218","userIdType":"profile_id","value":"1"} - |1434534565865 $service $action 41814266_94012477588942163 {"is_shared":"false","relationship":"FE"} {"userId":"19268547","userIdType":"profile_id","value":"1"} - |1434534565865 $service $action 66697741_94007840665633458 {"is_shared":"false","relationship":"FE"} {"userId":"19268547","userIdType":"profile_id","value":"1"} - |1434534566142 $service $action 66444074_94012737377133826 {"is_shared":"false","relationship":"FE"} {"userId":"11917195","userIdType":"profile_id","value":"1"} - |1434534566077 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"37709890","userIdType":"profile_id","value":"1"} - |1434534565938 $service $action 40921487_94012905738975266 {"is_shared":"false","relationship":"FE"} {"userId":"59869223","userIdType":"profile_id","value":"1"} - |1434534566033 $service $action 64506628_93994707216829506 {"is_shared":"false","relationship":"FE"} {"userId":"50375575","userIdType":"profile_id","value":"1"} - |1434534566451 $service $action 40748868_94013448321919139 {"is_shared":"false","relationship":"FE"} {"userId":"12249539","userIdType":"profile_id","value":"1"} - |1434534566669 $service $action 64499956_94013227717457106 {"is_shared":"false","relationship":"FE"} {"userId":"25167419","userIdType":"profile_id","value":"1"} - |1434534566669 $service $action 66444074_94012737377133826 {"is_shared":"false","relationship":"FE"} {"userId":"25167419","userIdType":"profile_id","value":"1"} - |1434534566318 $service $action 64774665_94012837889027027 {"is_shared":"true","relationship":"F"} {"userId":"71557816","userIdType":"profile_id","value":"1"} - |1434534566274 $service $action 67075480_94008509166933763 {"is_shared":"false","relationship":"FE"} {"userId":"57931860","userIdType":"profile_id","value":"1"} - |1434534566659 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"19990823","userIdType":"profile_id","value":"1"} - |1434534566250 $service $action 70670053_93719933175630611 {"is_shared":"true","relationship":"F"} {"userId":"68897412","userIdType":"profile_id","value":"1"} - |1434534566402 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"15541439","userIdType":"profile_id","value":"1"} - |1434534566122 $service $action 48890741_94013463616012786 {"is_shared":"false","relationship":"FE"} {"userId":"48040409","userIdType":"profile_id","value":"1"} - |1434534566055 $service $action 64509008_94002318232678546 {"is_shared":"true","relationship":"F"} {"userId":"46532039","userIdType":"profile_id","value":"1"} - |1434534565994 $service $action 66644368_94009163363033795 {"is_shared":"false","relationship":"FE"} {"userId":"4143147","userIdType":"profile_id","value":"1"} - |1434534566448 $service $action 64587644_93938555963733954 {"is_shared":"false","relationship":"FE"} {"userId":"689042","userIdType":"profile_id","value":"1"} - |1434534565935 $service $action 52812511_94012009551561315 {"is_shared":"false","relationship":"FE"} {"userId":"35509692","userIdType":"profile_id","value":"1"} - |1434534566544 $service $action 70452048_94008573197583762 {"is_shared":"false","relationship":"FE"} {"userId":"5172421","userIdType":"profile_id","value":"1"} - |1434534565929 $service $action 54547023_94013384964278435 {"is_shared":"false","relationship":"FE"} {"userId":"33556498","userIdType":"profile_id","value":"1"} - |1434534566358 $service $action 46889329_94013502934177075 {"is_shared":"false","relationship":"FE"} {"userId":"8987346","userIdType":"profile_id","value":"1"} - |1434534566057 $service $action 67075480_94008509166933763 {"is_shared":"false","relationship":"FE"} {"userId":"35134964","userIdType":"profile_id","value":"1"} - |1434534566140 $service $action 54547023_94013384964278435 {"is_shared":"false","relationship":"FE"} {"userId":"11900315","userIdType":"profile_id","value":"1"} - |1434534566158 $service $action 64639374_93888330176053635 {"is_shared":"true","relationship":"F"} {"userId":"49996643","userIdType":"profile_id","value":"1"} - |1434534566025 $service $action 67265128_94009084771192002 {"is_shared":"false","relationship":"FE"} {"userId":"37801480","userIdType":"profile_id","value":"1"} - """.stripMargin.trim - // println(data) - val rdd = sc.parallelize(Seq(("", data))) - - // rdd.foreachPartition { part => - // part.foreach(println) - // } - val resultRdd = CounterFunctions.makeExactRdd(rdd, 2) - val result = resultRdd.collect().toMap - - // result.foreachPartition { part => - // part.foreach(println) - // } - - val parsed = { - for { - line <- GraphUtil.parseString(data) - item <- CounterEtlItem(line).toSeq - ev <- CounterFunctions.exactMapper(item).toSeq - } yield { - ev - } - } - val parsedResult = parsed.groupBy(_._1).mapValues(values => values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ + _, 0L))) - - // parsedResult.foreach { case (k, v) => - // println(k, v) - // } - - result should not be empty - result should equal (parsedResult) - - val itemId = "46889329_94013502934177075" - val key = ExactKey(DefaultCounterModel.findByServiceAction(service, action).get, itemId, checkItemType = true) - val value = result.get(key) - - value should not be empty - value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String, String])) should equal (Some(6L)) - - exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map.empty[String, Set[String]]) should be (None) - - val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - resultRdd.foreachPartition { part => - CounterFunctions.updateExactCounter(part.toSeq, acc) - } - - Option(FetchedCountsGrouped(key, Map( - (IntervalUnit.TOTAL, Map.empty[String, String]) -> Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l) - ))).foreach { expected => - exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map.empty[String, Set[String]]) should be (Some(expected)) - } - Option(FetchedCountsGrouped(key, Map( - (IntervalUnit.TOTAL, Map("is_shared" -> "false")) -> Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l) - ))).foreach { expected => - exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("is_shared" -> Set("false"))) should be (Some(expected)) - } - Option(FetchedCountsGrouped(key, Map( - (IntervalUnit.TOTAL, Map("relationship" -> "FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l) - ))).foreach { expected => - exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("relationship" -> Set("FE"))) should be (Some(expected)) - } - Option(FetchedCountsGrouped(key, Map( - (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" -> "FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.relationship.false.FE") -> 6l) - ))).foreach { expected => - exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be (Some(expected)) - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala ---------------------------------------------------------------------- diff --git a/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala b/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala deleted file mode 100644 index 434673d..0000000 --- a/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala +++ /dev/null @@ -1,448 +0,0 @@ -package s2.counter.stream - -import com.kakao.s2graph.core.mysqls.Label -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -import play.api.libs.json.Json -import s2.config.{S2ConfigFactory, S2CounterConfig} -import s2.counter.core.CounterFunctions.HashMapAccumulable -import s2.counter.core.TimedQualifier.IntervalUnit -import s2.counter.core._ -import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, RankingStorageGraph} -import s2.helper.CounterAdmin -import s2.models.{Counter, DBModel, DefaultCounterModel} -import s2.spark.HashMapParam - -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.{Failure, Success} - -/** - * Created by hsleep([email protected]) on 15. 6. 17.. - */ -class RankingCounterStreamingSpec extends FlatSpec with BeforeAndAfterAll with Matchers { - private val master = "local[2]" - private val appName = "ranking_counter_streaming" - private val batchDuration = Seconds(1) - - private var sc: SparkContext = _ - private var ssc: StreamingContext = _ - - val admin = new CounterAdmin(S2ConfigFactory.config) - val graphOp = new GraphOperation(S2ConfigFactory.config) - val s2config = new S2CounterConfig(S2ConfigFactory.config) - - val exactCounter = new ExactCounter(S2ConfigFactory.config, new ExactStorageGraph(S2ConfigFactory.config)) - val rankingCounter = new RankingCounter(S2ConfigFactory.config, new RankingStorageGraph(S2ConfigFactory.config)) - - val service = "test" - val action = "test_case" - val action_base = "test_case_base" - val action_rate = "test_case_rate" - val action_rate_threshold = "test_case_rate_threshold" - val action_trend = "test_case_trend" - - override def beforeAll(): Unit = { - DBModel.initialize(S2ConfigFactory.config) - - val conf = new SparkConf() - .setMaster(master) - .setAppName(appName) - - ssc = new StreamingContext(conf, batchDuration) - - sc = ssc.sparkContext - - admin.setupCounterOnGraph() - - // create test_case label - com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz") - if (Label.findByName(action, useCache = false).isEmpty) { - val strJs = - s""" - |{ - | "label": "$action", - | "srcServiceName": "$service", - | "srcColumnName": "src", - | "srcColumnType": "string", - | "tgtServiceName": "$service", - | "tgtColumnName": "$action", - | "tgtColumnType": "string", - | "indices": [ - | ], - | "props": [ - | ] - |} - """.stripMargin - graphOp.createLabel(Json.parse(strJs)) - } - if (Label.findByName(action_base, useCache = false).isEmpty) { - val strJs = - s""" - |{ - | "label": "$action_base", - | "srcServiceName": "$service", - | "srcColumnName": "src", - | "srcColumnType": "string", - | "tgtServiceName": "$service", - | "tgtColumnName": "$action", - | "tgtColumnType": "string", - | "indices": [ - | ], - | "props": [ - | ] - |} - """.stripMargin - graphOp.createLabel(Json.parse(strJs)) - } - - // action - admin.deleteCounter(service, action).foreach { - case Success(v) => - case Failure(ex) => - println(s"$ex") - } - admin.createCounter(Counter(useFlag = true, 2, service, action, Counter.ItemType.STRING, autoComb = true, "", useRank = true)) - val policy = DefaultCounterModel.findByServiceAction(service, action).get - - // action_base - admin.deleteCounter(service, action_base).foreach { - case Success(v) => - case Failure(ex) => - println(s"$ex") - } - admin.createCounter(Counter(useFlag = true, 2, service, action_base, Counter.ItemType.STRING, autoComb = true, "", useRank = true)) - val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get - - // action_rate - admin.deleteCounter(service, action_rate).foreach { - case Success(v) => - case Failure(ex) => - println(s"$ex") - } - admin.createCounter(Counter(useFlag = true, 2, service, action_rate, Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true, - rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id))) - - // action_rate_threshold - admin.deleteCounter(service, action_rate_threshold).foreach { - case Success(v) => - case Failure(ex) => - println(s"$ex") - } - admin.createCounter(Counter(useFlag = true, 2, service, action_rate_threshold, Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true, - rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id), rateThreshold = Some(3))) - - // action_trend - admin.deleteCounter(service, action_trend).foreach { - case Success(v) => - case Failure(ex) => - println(s"$ex") - } - admin.createCounter(Counter(useFlag = true, 2, service, action_trend, Counter.ItemType.STRING, autoComb = true, "p1", useRank = true, - rateActionId = Some(policy.id), rateBaseId = Some(policy.id))) - } - - override def afterAll(): Unit = { - admin.deleteCounter(service, action) - admin.deleteCounter(service, action_base) - admin.deleteCounter(service, action_rate) - admin.deleteCounter(service, action_rate_threshold) - admin.deleteCounter(service, action_trend) - if (ssc != null) { - ssc.stop() - } - } - - "RankingCounterStreaming" should "update" in { - val policy = DefaultCounterModel.findByServiceAction(service, action).get -// val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get - - rankingCounter.ready(policy) should equal (true) - val data = - s""" - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":3}]} - |{"success":false,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} - |{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} - |{"success":true,"policyId":${policy.id},"item":"4","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} - """.stripMargin.trim - // println(data) - val rdd = sc.parallelize(Seq(("", data))) - - // rdd.foreachPartition { part => - // part.foreach(println) - // } - - val result = CounterFunctions.makeRankingRdd(rdd, 2).collect().toMap - - // result.foreachPartition { part => - // part.foreach(println) - // } - - val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - result should not be empty - val rankKey = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) - result should contain (rankKey -> Map( - "1" -> RankingValue(3, 1), - "3" -> RankingValue(2, 2), - "4" -> RankingValue(1, 1) - )) - - val key = RankingKey(policy.id, policy.version, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) - val value = result.get(key) - - value should not be empty - value.get.get("1").get should equal (RankingValue(3, 1)) - value.get.get("2") shouldBe empty - value.get.get("3").get should equal (RankingValue(2, 2)) - - rankingCounter.ready(policy) should equal (true) - - // delete, update and get - rankingCounter.delete(key) - Thread.sleep(1000) - CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc) - Thread.sleep(1000) - val rst = rankingCounter.getTopK(key) - - rst should not be empty -// rst.get.totalScore should equal(4f) - rst.get.values should contain allOf(("3", 2d), ("4", 1d), ("1", 3d)) - } - -// "rate by base" >> { -// val data = -// """ -// |{"success":true,"policyId":42,"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]} -// """.stripMargin.trim -// val rdd = sc.parallelize(Seq(("", data))) -// -// val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2).collect() -// trxLogRdd.foreach { log => -// CounterFunctions.rateBaseRankingMapper(log) must not be empty -// } -// -// true must_== true -// } - - it should "update rate ranking counter" in { - val policy = DefaultCounterModel.findByServiceAction(service, action).get - val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get - val ratePolicy = DefaultCounterModel.findByServiceAction(service, action_rate).get - - // update base policy - val eq = ExactQualifier(TimedQualifier("M", 1433084400000l), "") - val exactKey = ExactKey(basePolicy, "1", checkItemType = true) - - // check base item count - exactCounter.updateCount(basePolicy, Seq( - (exactKey, Map(eq -> 2l)) - )) - Thread.sleep(1000) - - // direct get - val baseCount = exactCounter.getCount(basePolicy, "1", Seq(IntervalUnit.MONTHLY), 1433084400000l, 1433084400000l, Map.empty[String, Set[String]]) - baseCount should not be empty - baseCount.get should equal (FetchedCountsGrouped(exactKey, Map( - (eq.tq.q, Map.empty[String, String]) -> Map(eq-> 2l) - ))) - - // related get - val relatedCount = exactCounter.getRelatedCounts(basePolicy, Seq("1" -> Seq(eq))) - relatedCount should not be empty - relatedCount.get("1") should not be empty - relatedCount.get("1").get should equal (Map(eq -> 2l)) - - val data = - s""" - |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]} - |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]} - |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":2,"result":4}]} - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} - """.stripMargin.trim - // println(data) - val rdd = sc.parallelize(Seq(("", data))) - - // rdd.foreachPartition { part => - // part.foreach(println) - // } - - val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2) - trxLogRdd.count() should equal (data.trim.split('\n').length) - - val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2) - itemRankingRdd.foreach(println) - - val result = CounterFunctions.rateRankingCount(itemRankingRdd, 2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id) - result.foreach(println) - result should have size 3 - - val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - // rate ranking - val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) - val value = result.get(key) - -// println(key, value) - - value should not be empty - value.get.get("1") should not be empty - value.get.get("1").get should equal (RankingValue(1, 0)) - value.get.get("2").get should equal (RankingValue(0.25, 0)) - - val key2 = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 1433084400000L), "p1.1")) - val value2 = result.get(key2) - -// println(key2, value2) - - val values = value.map(v => (key, v)).toSeq ++ value2.map(v => (key2, v)).toSeq - println(s"values: $values") - - // delete, update and get - rankingCounter.delete(key) - rankingCounter.delete(key2) - Thread.sleep(1000) - CounterFunctions.updateRankingCounter(values, acc) - // for update graph - Thread.sleep(1000) - - val rst = rankingCounter.getTopK(key) - rst should not be empty - rst.get.values should equal (Seq(("1", 1d), ("2", 0.25d))) - - val rst2 = rankingCounter.getTopK(key2) - rst2 should not be empty - rst2.get.values should equal (Seq(("2", 0.25d))) - } - - it should "update rate ranking counter with threshold" in { - val policy = DefaultCounterModel.findByServiceAction(service, action).get - val basePolicy = DefaultCounterModel.findByServiceAction(service, action_base).get - val ratePolicy = DefaultCounterModel.findByServiceAction(service, action_rate_threshold).get - - val data = - s""" - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]} - |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]} - |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]} - |{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]} - """.stripMargin.trim - // println(data) - val rdd = sc.parallelize(Seq(("", data))) - - // rdd.foreachPartition { part => - // part.foreach(println) - // } - - val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2) - trxLogRdd.count() should equal (data.trim.split('\n').length) - - val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2) - itemRankingRdd.foreach(println) - - val result = CounterFunctions.rateRankingCount(itemRankingRdd, 2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id) - result.foreach(println) - result should have size 2 - - val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - // rate ranking - val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 1433084400000L), "")) - val value = result.get(key) - - value should not be empty - value.get.get("1") should be (None) - value.get.get("2").get should equal (RankingValue(0.25, 0)) - - // delete, update and get - rankingCounter.delete(key) - Thread.sleep(1000) - CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc) - Thread.sleep(1000) - val rst = rankingCounter.getTopK(key) - - rst should not be empty - rst.get.values should equal (Seq(("2", 0.25d))) - } - - it should "update trend ranking counter" in { - val policy = DefaultCounterModel.findByServiceAction(service, action).get - val trendPolicy = DefaultCounterModel.findByServiceAction(service, action_trend).get - - val exactKey1 = ExactKey(policy, "1", checkItemType = true) - val exactKey2 = ExactKey(policy, "2", checkItemType = true) - // update old key value - val tq1 = TimedQualifier("M", 1435676400000l) - val tq2 = TimedQualifier("M", 1427814000000l) - exactCounter.updateCount(policy, Seq( - exactKey1 -> Map(ExactQualifier(tq1.add(-1), "") -> 1l, ExactQualifier(tq2.add(-1), "") -> 92l) - )) - val eq1 = ExactQualifier(tq1, "") - val eq2 = ExactQualifier(tq2, "") - - val oldCount = exactCounter.getPastCounts(policy, Seq("1" -> Seq(eq1, eq2), "2" -> Seq(eq1, eq1.copy(dimension = "gender.M")))) - oldCount should not be empty - oldCount.get("1").get should equal(Map(eq1 -> 1l, eq2 -> 92l)) - oldCount.get("2") should be (None) - - val data = - s""" - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":2}]} - |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1435676400000,"value":1,"result":1}]} - |{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1427814000000,"value":1,"result":92}]} - """.stripMargin.trim - // println(data) - val rdd = sc.parallelize(Seq(("", data))) - - // rdd.foreachPartition { part => - // part.foreach(println) - // } - - val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2) - trxLogRdd.count() should equal (data.trim.split('\n').length) - - val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2) - itemRankingRdd.foreach(println) - - val result = CounterFunctions.trendRankingCount(itemRankingRdd, 2).collect().toMap - result.foreach(println) - // dimension gender.M is ignored, because gender is not defined dimension in trend policy. - result should have size 2 - - val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, Long], "Throughput")(HashMapParam[String, Long](_ + _)) - - // trend ranking - val key = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M", 1435676400000L), "")) - val value = result.get(key) - - value should not be empty - value.get.get("1").get should equal (RankingValue(2, 0)) - value.get.get("2").get should equal (RankingValue(1, 0)) - - val key2 = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M", 1427814000000L), "")) - val value2 = result.get(key2) - - value2 should not be empty - value2.get.get("1").get should equal (RankingValue(1, 0)) - - // delete, update and get - rankingCounter.delete(key) - Thread.sleep(1000) - CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc) - Thread.sleep(1000) - val rst = rankingCounter.getTopK(key) - - rst should not be empty - rst.get.values should equal (Seq("1" -> 2, "2" -> 1)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_netty/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/resources/application.conf b/s2rest_netty/src/main/resources/application.conf deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_netty/src/main/scala/Server.scala ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/scala/Server.scala b/s2rest_netty/src/main/scala/Server.scala deleted file mode 100644 index 16477b1..0000000 --- a/s2rest_netty/src/main/scala/Server.scala +++ /dev/null @@ -1,202 +0,0 @@ -package com.kakao.s2graph.rest.netty - -import java.util.Map.Entry -import java.util.concurrent.Executors -import java.util.function.Consumer - -import com.kakao.s2graph.core.GraphExceptions.BadQueryException -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.Experiment -import com.kakao.s2graph.core.rest.RestHandler.HandlerResult -import com.kakao.s2graph.core.rest._ -import com.kakao.s2graph.core.utils.Extensions._ -import com.kakao.s2graph.core.utils.logger -import com.typesafe.config.ConfigFactory -import io.netty.bootstrap.ServerBootstrap -import io.netty.buffer.{ByteBuf, Unpooled} -import io.netty.channel._ -import io.netty.channel.nio.NioEventLoopGroup -import io.netty.channel.socket.SocketChannel -import io.netty.channel.socket.nio.NioServerSocketChannel -import io.netty.handler.codec.http.HttpHeaders._ -import io.netty.handler.codec.http._ -import io.netty.handler.logging.{LogLevel, LoggingHandler} -import io.netty.util.CharsetUtil -import play.api.libs.json._ - -import scala.collection.mutable -import scala.concurrent.ExecutionContext -import scala.io.Source -import scala.util.{Failure, Success, Try} - -class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] with JSONParser { - val ApplicationJson = "application/json" - - val Ok = HttpResponseStatus.OK - val CloseOpt = Option(ChannelFutureListener.CLOSE) - val BadRequest = HttpResponseStatus.BAD_REQUEST - val BadGateway = HttpResponseStatus.BAD_GATEWAY - val NotFound = HttpResponseStatus.NOT_FOUND - val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR - - def badRoute(ctx: ChannelHandlerContext) = - simpleResponse(ctx, BadGateway, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) - - def simpleResponse(ctx: ChannelHandlerContext, - httpResponseStatus: HttpResponseStatus, - byteBufOpt: Option[ByteBuf] = None, - headers: Seq[(String, String)] = Nil, - channelFutureListenerOpt: Option[ChannelFutureListener] = None): Unit = { - - val res: FullHttpResponse = byteBufOpt match { - case None => new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus) - case Some(byteBuf) => - new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, byteBuf) - } - - headers.foreach { case (k, v) => res.headers().set(k, v) } - val channelFuture = ctx.writeAndFlush(res) - - channelFutureListenerOpt match { - case None => - case Some(listener) => channelFuture.addListener(listener) - } - } - - def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: JsValue, result: HandlerResult, startedAt: Long) = { - var closeOpt = CloseOpt - var headers = mutable.ArrayBuilder.make[(String, String)] - - headers += (Names.CONTENT_TYPE -> ApplicationJson) - result.headers.foreach(headers += _) - - if (HttpHeaders.isKeepAlive(req)) { - headers += (Names.CONNECTION -> HttpHeaders.Values.KEEP_ALIVE) - closeOpt = None - } - - result.body onComplete { - case Success(json) => - val duration = System.currentTimeMillis() - startedAt - - val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 ${s2rest.calcSize(json)} ${requestBody}" - logger.info(log) - - val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, CharsetUtil.UTF_8) - - headers += (Names.CONTENT_LENGTH -> buf.readableBytes().toString) - - simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result()) - case Failure(ex) => ex match { - case e: BadQueryException => - logger.error(s"{$requestBody}, ${e.getMessage}", e) - val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, CharsetUtil.UTF_8) - simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) - case e: Exception => - logger.error(s"${requestBody}, ${e.getMessage}", e) - val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8) - simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) - } - } - } - - override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = { - val uri = req.getUri - val startedAt = System.currentTimeMillis() - - req.getMethod match { - case HttpMethod.GET => - uri match { - case "/health_check.html" => - if (NettyServer.isHealthy) { - val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8) - simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt) - } else { - simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt) - } - - case s if s.startsWith("/graphs/getEdge/") => - // src, tgt, label, dir - val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4) - val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) - val result = s2rest.checkEdges(params) - toResponse(ctx, req, params, result, startedAt) - case _ => badRoute(ctx) - } - - case HttpMethod.PUT => - if (uri.startsWith("/health_check/")) { - val newHealthCheck = uri.split("/").last.toBoolean - NettyServer.isHealthy = newHealthCheck - val newHealthCheckMsg = Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8) - simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt) - } else badRoute(ctx) - - case HttpMethod.POST => - val body = req.content.toString(CharsetUtil.UTF_8) - - val result = s2rest.doPost(uri, body, Option(req.headers().get(Experiment.impressionKey))) - toResponse(ctx, req, Json.parse(body), result, startedAt) - - case _ => - simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) - } - } - - override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { - cause.printStackTrace() - logger.error(s"exception on query.", cause) - simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) - } -} - -// Simple http server -object NettyServer extends App { - /** should be same with Boostrap.onStart on play */ - - val numOfThread = Runtime.getRuntime.availableProcessors() - val threadPool = Executors.newFixedThreadPool(numOfThread) - val ec = ExecutionContext.fromExecutor(threadPool) - - val config = ConfigFactory.load() - val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get - - // init s2graph with config - val s2graph = new Graph(config)(ec) - val rest = new RestHandler(s2graph)(ec) - - val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get - var isHealthy = config.getBooleanWithFallback("app.health.on", true) - - logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") - - // Configure the server. - val bossGroup: EventLoopGroup = new NioEventLoopGroup(1) - val workerGroup: EventLoopGroup = new NioEventLoopGroup() - - try { - val b: ServerBootstrap = new ServerBootstrap() - b.option(ChannelOption.SO_BACKLOG, Int.box(2048)) - - b.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel]) - .handler(new LoggingHandler(LogLevel.INFO)) - .childHandler(new ChannelInitializer[SocketChannel] { - override def initChannel(ch: SocketChannel) { - val p = ch.pipeline() - p.addLast(new HttpServerCodec()) - p.addLast(new HttpObjectAggregator(65536)) - p.addLast(new S2RestHandler(rest)(ec)) - } - }) - - logger.info(s"Listening for HTTP on /0.0.0.0:$port") - val ch: Channel = b.bind(port).sync().channel() - ch.closeFuture().sync() - - } finally { - bossGroup.shutdownGracefully() - workerGroup.shutdownGracefully() - s2graph.shutdown() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala ---------------------------------------------------------------------- diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala new file mode 100644 index 0000000..6669c9c --- /dev/null +++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala @@ -0,0 +1,200 @@ +package org.apache.s2graph.rest.netty + +import java.util.concurrent.Executors + +import com.typesafe.config.ConfigFactory +import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.{ByteBuf, Unpooled} +import io.netty.channel._ +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.handler.codec.http.HttpHeaders._ +import io.netty.handler.codec.http._ +import io.netty.handler.logging.{LogLevel, LoggingHandler} +import io.netty.util.CharsetUtil +import org.apache.s2graph.core.GraphExceptions.BadQueryException +import org.apache.s2graph.core.mysqls.Experiment +import org.apache.s2graph.core.rest.RestHandler +import org.apache.s2graph.core.rest.RestHandler.HandlerResult +import org.apache.s2graph.core.utils.Extensions._ +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{Graph, JSONParser, PostProcess} +import play.api.libs.json._ + +import scala.collection.mutable +import scala.concurrent.ExecutionContext +import scala.io.Source +import scala.util.{Failure, Success, Try} + +class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) extends SimpleChannelInboundHandler[FullHttpRequest] with JSONParser { + val ApplicationJson = "application/json" + + val Ok = HttpResponseStatus.OK + val CloseOpt = Option(ChannelFutureListener.CLOSE) + val BadRequest = HttpResponseStatus.BAD_REQUEST + val BadGateway = HttpResponseStatus.BAD_GATEWAY + val NotFound = HttpResponseStatus.NOT_FOUND + val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR + + def badRoute(ctx: ChannelHandlerContext) = + simpleResponse(ctx, BadGateway, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) + + def simpleResponse(ctx: ChannelHandlerContext, + httpResponseStatus: HttpResponseStatus, + byteBufOpt: Option[ByteBuf] = None, + headers: Seq[(String, String)] = Nil, + channelFutureListenerOpt: Option[ChannelFutureListener] = None): Unit = { + + val res: FullHttpResponse = byteBufOpt match { + case None => new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus) + case Some(byteBuf) => + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, byteBuf) + } + + headers.foreach { case (k, v) => res.headers().set(k, v) } + val channelFuture = ctx.writeAndFlush(res) + + channelFutureListenerOpt match { + case None => + case Some(listener) => channelFuture.addListener(listener) + } + } + + def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, requestBody: JsValue, result: HandlerResult, startedAt: Long) = { + var closeOpt = CloseOpt + var headers = mutable.ArrayBuilder.make[(String, String)] + + headers += (Names.CONTENT_TYPE -> ApplicationJson) + result.headers.foreach(headers += _) + + if (HttpHeaders.isKeepAlive(req)) { + headers += (Names.CONNECTION -> HttpHeaders.Values.KEEP_ALIVE) + closeOpt = None + } + + result.body onComplete { + case Success(json) => + val duration = System.currentTimeMillis() - startedAt + + val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 ${s2rest.calcSize(json)} ${requestBody}" + logger.info(log) + + val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, CharsetUtil.UTF_8) + + headers += (Names.CONTENT_LENGTH -> buf.readableBytes().toString) + + simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = closeOpt, headers = headers.result()) + case Failure(ex) => ex match { + case e: BadQueryException => + logger.error(s"{$requestBody}, ${e.getMessage}", e) + val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, CharsetUtil.UTF_8) + simpleResponse(ctx, Ok, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) + case e: Exception => + logger.error(s"${requestBody}, ${e.getMessage}", e) + val buf: ByteBuf = Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8) + simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), channelFutureListenerOpt = CloseOpt, headers = headers.result()) + } + } + } + + override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = { + val uri = req.getUri + val startedAt = System.currentTimeMillis() + + req.getMethod match { + case HttpMethod.GET => + uri match { + case "/health_check.html" => + if (NettyServer.isHealthy) { + val healthCheckMsg = Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8) + simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), channelFutureListenerOpt = CloseOpt) + } else { + simpleResponse(ctx, NotFound, channelFutureListenerOpt = CloseOpt) + } + + case s if s.startsWith("/graphs/getEdge/") => + // src, tgt, label, dir + val Array(srcId, tgtId, labelName, direction) = s.split("/").takeRight(4) + val params = Json.arr(Json.obj("label" -> labelName, "direction" -> direction, "from" -> srcId, "to" -> tgtId)) + val result = s2rest.checkEdges(params) + toResponse(ctx, req, params, result, startedAt) + case _ => badRoute(ctx) + } + + case HttpMethod.PUT => + if (uri.startsWith("/health_check/")) { + val newHealthCheck = uri.split("/").last.toBoolean + NettyServer.isHealthy = newHealthCheck + val newHealthCheckMsg = Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8) + simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), channelFutureListenerOpt = CloseOpt) + } else badRoute(ctx) + + case HttpMethod.POST => + val body = req.content.toString(CharsetUtil.UTF_8) + + val result = s2rest.doPost(uri, body, Option(req.headers().get(Experiment.impressionKey))) + toResponse(ctx, req, Json.parse(body), result, startedAt) + + case _ => + simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) + } + } + + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + cause.printStackTrace() + logger.error(s"exception on query.", cause) + simpleResponse(ctx, BadRequest, byteBufOpt = None, channelFutureListenerOpt = CloseOpt) + } +} + +// Simple http server +object NettyServer extends App { + /** should be same with Boostrap.onStart on play */ + + val numOfThread = Runtime.getRuntime.availableProcessors() + val threadPool = Executors.newFixedThreadPool(numOfThread) + val ec = ExecutionContext.fromExecutor(threadPool) + + val config = ConfigFactory.load() + val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get + + // init s2graph with config + val s2graph = new Graph(config)(ec) + val rest = new RestHandler(s2graph)(ec) + + val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get + var isHealthy = config.getBooleanWithFallback("app.health.on", true) + + logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") + + // Configure the server. + val bossGroup: EventLoopGroup = new NioEventLoopGroup(1) + val workerGroup: EventLoopGroup = new NioEventLoopGroup() + + try { + val b: ServerBootstrap = new ServerBootstrap() + b.option(ChannelOption.SO_BACKLOG, Int.box(2048)) + + b.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel]) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer[SocketChannel] { + override def initChannel(ch: SocketChannel) { + val p = ch.pipeline() + p.addLast(new HttpServerCodec()) + p.addLast(new HttpObjectAggregator(65536)) + p.addLast(new S2RestHandler(rest)(ec)) + } + }) + + logger.info(s"Listening for HTTP on /0.0.0.0:$port") + val ch: Channel = b.bind(port).sync().channel() + ch.closeFuture().sync() + + } finally { + bossGroup.shutdownGracefully() + workerGroup.shutdownGracefully() + s2graph.shutdown() + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/Bootstrap.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala deleted file mode 100644 index 6ce3ac4..0000000 --- a/s2rest_play/app/Bootstrap.scala +++ /dev/null @@ -1,81 +0,0 @@ -package com.kakao.s2graph.rest - -import java.util.concurrent.Executors - -import actors.QueueActor -import com.kakao.s2graph.core.rest._ -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph} -import config.Config -import controllers.{ApplicationController} -import play.api.Application -import play.api.mvc.{WithFilters, _} -import play.filters.gzip.GzipFilter - -import scala.concurrent.{ExecutionContext, Future} -import scala.io.Source -import scala.util.Try - -object Global extends WithFilters(new GzipFilter()) { - var s2graph: Graph = _ - var storageManagement: Management = _ - var s2parser: RequestParser = _ - var s2rest: RestHandler = _ - - // Application entry point - override def onStart(app: Application) { - ApplicationController.isHealthy = false - - val numOfThread = Runtime.getRuntime.availableProcessors() - val threadPool = Executors.newFixedThreadPool(numOfThread) - val ec = ExecutionContext.fromExecutor(threadPool) - - val config = Config.conf.underlying - - // init s2graph with config - s2graph = new Graph(config)(ec) - storageManagement = new Management(s2graph) - s2parser = new RequestParser(s2graph.config) // merged config - s2rest = new RestHandler(s2graph)(ec) - - QueueActor.init(s2graph) - - if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) { - ExceptionHandler.apply(config) - } - - val defaultHealthOn = Config.conf.getBoolean("app.health.on").getOrElse(true) - ApplicationController.deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get - - ApplicationController.isHealthy = defaultHealthOn - logger.info(s"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}") - } - - override def onStop(app: Application) { - QueueActor.shutdown() - - if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) { - ExceptionHandler.shutdown() - } - - /** - * shutdown hbase client for flush buffers. - */ - s2graph.shutdown() - } - - override def onError(request: RequestHeader, ex: Throwable): Future[Result] = { - logger.error(s"onError => ip:${request.remoteAddress}, request:${request}", ex) - Future.successful(Results.InternalServerError) - } - - override def onHandlerNotFound(request: RequestHeader): Future[Result] = { - logger.error(s"onHandlerNotFound => ip:${request.remoteAddress}, request:${request}") - Future.successful(Results.NotFound) - } - - override def onBadRequest(request: RequestHeader, error: String): Future[Result] = { - logger.error(s"onBadRequest => ip:${request.remoteAddress}, request:$request, error:$error") - Future.successful(Results.BadRequest(error)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/actors/QueueActor.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/actors/QueueActor.scala b/s2rest_play/app/actors/QueueActor.scala deleted file mode 100644 index 74bc65d..0000000 --- a/s2rest_play/app/actors/QueueActor.scala +++ /dev/null @@ -1,92 +0,0 @@ -package actors - -import java.util.concurrent.TimeUnit - -import actors.Protocol.FlushAll -import akka.actor._ -import com.kakao.s2graph.core.ExceptionHandler._ -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.utils.logger -import config.Config -import play.api.Play.current -import play.api.libs.concurrent.Akka - -import scala.collection.mutable -import scala.concurrent.duration.Duration - -/** - * Created by shon on 9/2/15. - */ -object Protocol { - - case object Flush - - case object FlushAll - -} - -object QueueActor { - /** we are throttling down here so fixed number of actor to constant */ - var router: ActorRef = _ - - // Akka.system.actorOf(props(), name = "queueActor") - def init(s2: Graph) = { - router = Akka.system.actorOf(props(s2)) - } - - def shutdown() = { - router ! FlushAll - Akka.system.shutdown() - Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2) - } - - def props(s2: Graph): Props = Props(classOf[QueueActor], s2) -} - -class QueueActor(s2: Graph) extends Actor with ActorLogging { - - import Protocol._ - - implicit val ec = context.system.dispatcher - // logger.error(s"QueueActor: $self") - val queue = mutable.Queue.empty[GraphElement] - var queueSize = 0L - val maxQueueSize = Config.LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE - val timeUnitInMillis = 10 - val rateLimitTimeStep = 1000 / timeUnitInMillis - val rateLimit = Config.LOCAL_QUEUE_ACTOR_RATE_LIMIT / rateLimitTimeStep - - - context.system.scheduler.schedule(Duration.Zero, Duration(timeUnitInMillis, TimeUnit.MILLISECONDS), self, Flush) - - override def receive: Receive = { - case element: GraphElement => - - if (queueSize > maxQueueSize) { - ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, None)) - } else { - queueSize += 1L - queue.enqueue(element) - } - - case Flush => - val elementsToFlush = - if (queue.size < rateLimit) queue.dequeueAll(_ => true) - else (0 until rateLimit).map(_ => queue.dequeue()) - - val flushSize = elementsToFlush.size - - queueSize -= elementsToFlush.length - s2.mutateElements(elementsToFlush) - - if (flushSize > 0) { - logger.info(s"flush: $flushSize, $queueSize") - } - - case FlushAll => - s2.mutateElements(queue) - context.stop(self) - - case _ => logger.error("unknown protocol") - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/config/Config.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/config/Config.scala b/s2rest_play/app/config/Config.scala deleted file mode 100644 index 98b87c5..0000000 --- a/s2rest_play/app/config/Config.scala +++ /dev/null @@ -1,41 +0,0 @@ -package config - -import play.api.Play - -object Config { - // HBASE - lazy val HBASE_ZOOKEEPER_QUORUM = conf.getString("hbase.zookeeper.quorum").getOrElse("localhost") - - - // HBASE CLIENT - lazy val ASYNC_HBASE_CLIENT_FLUSH_INTERVAL = conf.getInt("async.hbase.client.flush.interval").getOrElse(1000).toShort - lazy val RPC_TIMEOUT = conf.getInt("hbase.client.operation.timeout").getOrElse(1000) - lazy val MAX_ATTEMPT = conf.getInt("hbase.client.operation.maxAttempt").getOrElse(3) - - // PHASE - lazy val PHASE = conf.getString("phase").getOrElse("dev") - lazy val conf = Play.current.configuration - - // CACHE - lazy val CACHE_TTL_SECONDS = conf.getInt("cache.ttl.seconds").getOrElse(600) - lazy val CACHE_MAX_SIZE = conf.getInt("cache.max.size").getOrElse(10000) - - //KAFKA - lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("kafka.metadata.broker.list").getOrElse("localhost") - lazy val KAFKA_PRODUCER_POOL_SIZE = conf.getInt("kafka.producer.pool.size").getOrElse(0) - lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}" - lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async" - lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed" - - // is query or write - lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true) - lazy val IS_WRITE_SERVER = conf.getBoolean("is.write.server").getOrElse(true) - - - // query limit per step - lazy val QUERY_HARD_LIMIT = conf.getInt("query.hard.limit").getOrElse(300) - - // local queue actor - lazy val LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE = conf.getInt("local.queue.actor.max.queue.size").getOrElse(10000) - lazy val LOCAL_QUEUE_ACTOR_RATE_LIMIT = conf.getInt("local.queue.actor.rate.limit").getOrElse(1000) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/config/CounterConfig.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/config/CounterConfig.scala b/s2rest_play/app/config/CounterConfig.scala deleted file mode 100644 index 2569d55..0000000 --- a/s2rest_play/app/config/CounterConfig.scala +++ /dev/null @@ -1,10 +0,0 @@ -package config - -/** - * Created by hsleep([email protected]) on 15. 9. 3.. - */ -object CounterConfig { - // kafka - lazy val KAFKA_TOPIC_COUNTER = s"s2counter-${Config.PHASE}" - lazy val KAFKA_TOPIC_COUNTER_TRX = s"s2counter-trx-${Config.PHASE}" -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/controllers/AdminController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/AdminController.scala b/s2rest_play/app/controllers/AdminController.scala deleted file mode 100644 index bb73c40..0000000 --- a/s2rest_play/app/controllers/AdminController.scala +++ /dev/null @@ -1,424 +0,0 @@ -package controllers - -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.rest.RequestParser -import com.kakao.s2graph.core.utils.logger -import play.api.mvc -import play.api.mvc.{Action, Controller} -import play.api.libs.json._ -import play.api.libs.functional.syntax._ - -import scala.util.{Failure, Success, Try} - -object AdminController extends Controller { - - import ApplicationController._ - private val management: Management = com.kakao.s2graph.rest.Global.storageManagement - private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser - - /** - * admin message formatter - * @tparam T - */ - trait AdminMessageFormatter[T] { - def toJson(msg: T): JsValue - } - - object AdminMessageFormatter { - implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] { - def toJson(js: T) = js - } - - implicit val stringToJson = new AdminMessageFormatter[String] { - def toJson(js: String) = Json.obj("message" -> js) - } - } - - def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: T) = { - val formatter = implicitly[AdminMessageFormatter[T]] - f(formatter.toJson(message)) - } - - /** - * ok response - * @param message - * @tparam T - * @return - */ - def ok[T: AdminMessageFormatter](message: T) = { - val formatter = implicitly[AdminMessageFormatter[T]] - Ok(formatter.toJson(message)).as(applicationJsonHeader) - } - - /** - * bad request response - * @param message - * @tparam T - * @return - */ - def bad[T: AdminMessageFormatter](message: T) = { - val formatter = implicitly[AdminMessageFormatter[T]] - BadRequest(formatter.toJson(message)).as(applicationJsonHeader) - } - - /** - * not found response - * @param message - * @tparam T - * @return - */ - def notFound[T: AdminMessageFormatter](message: T) = { - val formatter = implicitly[AdminMessageFormatter[T]] - NotFound(formatter.toJson(message)).as(applicationJsonHeader) - } - - private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: Try[T])(callback: T => R): mvc.Result = res match { - case Success(m) => - val ret = callback(m) - logger.info(ret.toString) - ok(ret) - case Failure(error) => - logger.error(error.getMessage, error) - error match { - case JsResultException(e) => bad(JsError.toFlatJson(e)) - case _ => bad(error.getMessage) - } - } - - def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T => R): mvc.Result = res match { - case Some(m) => ok(callback(m)) - case None => notFound("not found") - } - - /** - * load all model cache - * @return - */ - def loadCache() = Action { request => - val startTs = System.currentTimeMillis() - - if (!ApplicationController.isHealthy) { - loadCacheInner() - } - - ok(s"${System.currentTimeMillis() - startTs}") - } - - def loadCacheInner() = { - Service.findAll() - ServiceColumn.findAll() - Label.findAll() - LabelMeta.findAll() - LabelIndex.findAll() - ColumnMeta.findAll() - } - - /** - * read - */ - - /** - * get service info - * @param serviceName - * @return - */ - def getService(serviceName: String) = Action { request => - val serviceOpt = Management.findService(serviceName) - optionResponse(serviceOpt)(_.toJson) - } - - /** - * get label info - * @param labelName - * @return - */ - def getLabel(labelName: String) = Action { request => - val labelOpt = Management.findLabel(labelName) - optionResponse(labelOpt)(_.toJson) - } - - /** - * get all labels of service - * @param serviceName - * @return - */ - def getLabels(serviceName: String) = Action { request => - Service.findByName(serviceName) match { - case None => notFound(s"Service $serviceName not found") - case Some(service) => - val src = Label.findBySrcServiceId(service.id.get) - val tgt = Label.findByTgtServiceId(service.id.get) - - ok(Json.obj("from" -> src.map(_.toJson), "to" -> tgt.map(_.toJson))) - } - } - - /** - * get service columns - * @param serviceName - * @param columnName - * @return - */ - def getServiceColumn(serviceName: String, columnName: String) = Action { request => - val serviceColumnOpt = for { - service <- Service.findByName(serviceName) - serviceColumn <- ServiceColumn.find(service.id.get, columnName, useCache = false) - } yield serviceColumn - - optionResponse(serviceColumnOpt)(_.toJson) - } - - /** - * create - */ - - /** - * create service - * @return - */ - def createService() = Action(parse.json) { request => - val serviceTry = createServiceInner(request.body) - tryResponse(serviceTry)(_.toJson) - } - - - def createServiceInner(jsValue: JsValue) = { - val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = requestParser.toServiceElements(jsValue) - management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) - } - - /** - * create label - * @return - */ - def createLabel() = Action(parse.json) { request => - val ret = createLabelInner(request.body) - tryResponse(ret)(_.toJson) - } - - - def createLabelInner(json: JsValue) = for { - labelArgs <- requestParser.toLabelElements(json) - label <- (management.createLabel _).tupled(labelArgs) - } yield label - - /** - * add index - * @return - */ - def addIndex() = Action(parse.json) { request => - val ret = addIndexInner(request.body) - tryResponse(ret)(_.label + " is updated") - } - - def addIndexInner(json: JsValue) = for { - (labelName, indices) <- requestParser.toIndexElements(json) - label <- Management.addIndex(labelName, indices) - } yield label - - /** - * create service column - * @return - */ - def createServiceColumn() = Action(parse.json) { request => - val serviceColumnTry = createServiceColumnInner(request.body) - tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => Json.obj("metas" -> columns.map(_.toJson)) } - } - - def createServiceColumnInner(jsValue: JsValue) = for { - (serviceName, columnName, columnType, props) <- requestParser.toServiceColumnElements(jsValue) - serviceColumn <- Management.createServiceColumn(serviceName, columnName, columnType, props) - } yield serviceColumn - - /** - * delete - */ - - /** - * delete label - * @param labelName - * @return - */ - def deleteLabel(labelName: String) = Action { request => - val deleteLabelTry = deleteLabelInner(labelName) - tryResponse(deleteLabelTry)(labelName => labelName + " is deleted") - } - - def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName) - - /** - * delete servieColumn - * @param serviceName - * @param columnName - * @return - */ - def deleteServiceColumn(serviceName: String, columnName: String) = Action { request => - val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName) - tryResponse(serviceColumnTry)(columnName => columnName + " is deleted") - } - - def deleteServiceColumnInner(serviceName: String, columnName: String) = - Management.deleteColumn(serviceName, columnName) - - /** - * update - */ - - /** - * add Prop to label - * @param labelName - * @return - */ - def addProp(labelName: String) = Action(parse.json) { request => - val labelMetaTry = addPropInner(labelName, request.body) - tryResponse(labelMetaTry)(_.toJson) - } - - def addPropInner(labelName: String, js: JsValue) = for { - prop <- requestParser.toPropElements(js) - labelMeta <- Management.addProp(labelName, prop) - } yield labelMeta - - /** - * add prop to serviceColumn - * @param serviceName - * @param columnName - * @return - */ - def addServiceColumnProp(serviceName: String, columnName: String) = Action(parse.json) { request => - addServiceColumnPropInner(serviceName, columnName)(request.body) match { - case None => bad(s"can`t find service with $serviceName or can`t find serviceColumn with $columnName") - case Some(m) => Ok(m.toJson).as(applicationJsonHeader) - } - } - - def addServiceColumnPropInner(serviceName: String, columnName: String)(js: JsValue) = { - for { - service <- Service.findByName(serviceName) - serviceColumn <- ServiceColumn.find(service.id.get, columnName) - prop <- requestParser.toPropElements(js).toOption - } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.defaultValue) - } - } - - /** - * add props to serviceColumn - * @param serviecName - * @param columnName - * @return - */ - def addServiceColumnProps(serviecName: String, columnName: String) = Action(parse.json) { request => - val jsObjs = request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject]) - val newProps = for { - js <- jsObjs - newProp <- addServiceColumnPropInner(serviecName, columnName)(js) - } yield newProp - ok(s"${newProps.size} is added.") - } - - /** - * copy label - * @param oldLabelName - * @param newLabelName - * @return - */ - def copyLabel(oldLabelName: String, newLabelName: String) = Action { request => - val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName)) - tryResponse(copyTry)(_.label + "created") - } - - /** - * rename label - * @param oldLabelName - * @param newLabelName - * @return - */ - def renameLabel(oldLabelName: String, newLabelName: String) = Action { request => - Label.findByName(oldLabelName) match { - case None => NotFound.as(applicationJsonHeader) - case Some(label) => - Management.updateLabelName(oldLabelName, newLabelName) - ok(s"Label was updated") - } - } - - /** - * update HTable for a label - * @param labelName - * @param newHTableName - * @return - */ - def updateHTable(labelName: String, newHTableName: String) = Action { request => - val updateTry = Management.updateHTable(labelName, newHTableName) - tryResponse(updateTry)(_.toString + " label(s) updated.") - } - - - case class HTableParams(cluster: String, hTableName: String, - preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) { - - override def toString(): String = { - s"""HtableParams - |-- cluster : $cluster - |-- hTableName : $hTableName - |-- preSplitSize : $preSplitSize - |-- hTableTTL : $hTableTTL - |-- compressionAlgorithm : $compressionAlgorithm - |""".stripMargin - } - } - - implicit object HTableParamsJsonConverter extends Format[HTableParams] { - def reads(json: JsValue): JsResult[HTableParams] = ( - (__ \ "cluster").read[String] and - (__ \ "hTableName").read[String] and - (__ \ "preSplitSize").read[Int] and - (__ \ "hTableTTL").readNullable[Int] and - (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json) - - def writes(o: HTableParams): JsValue = Json.obj( - "cluster" -> o.cluster, - "hTableName" -> o.hTableName, - "preSplitSize" -> o.preSplitSize, - "hTableTTL" -> o.hTableTTL, - "compressionAlgorithm" -> o.compressionAlgorithm - ) - } - - implicit object JsErrorJsonWriter extends Writes[JsError] { - def writes(o: JsError): JsValue = Json.obj( - "errors" -> JsArray( - o.errors.map { - case (path, validationErrors) => Json.obj( - "path" -> Json.toJson(path.toString()), - "validationErrors" -> JsArray(validationErrors.map(validationError => Json.obj( - "message" -> JsString(validationError.message), - "args" -> JsArray(validationError.args.map(_ match { - case x: Int => JsNumber(x) - case x => JsString(x.toString) - })) - ))) - ) - } - ) - ) - } - - def createHTable() = Action { request => - - // Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) - request.body.asJson.map(_.validate[HTableParams] match { - case JsSuccess(hTableParams, _) => { - management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), - hTableParams.preSplitSize, hTableParams.hTableTTL, - hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm)) - logger.info(hTableParams.toString()) - ok(s"HTable was created.") - } - case err@JsError(_) => bad(Json.toJson(err)) - }).getOrElse(bad("Invalid Json.")) - - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/controllers/ApplicationController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/controllers/ApplicationController.scala b/s2rest_play/app/controllers/ApplicationController.scala deleted file mode 100644 index 5f54edd..0000000 --- a/s2rest_play/app/controllers/ApplicationController.scala +++ /dev/null @@ -1,101 +0,0 @@ -package controllers - -import com.kakao.s2graph.core.GraphExceptions.BadQueryException -import com.kakao.s2graph.core.PostProcess -import com.kakao.s2graph.core.utils.logger -import play.api.libs.iteratee.Enumerator -import play.api.libs.json.{JsString, JsValue, Json} -import play.api.mvc._ - -import scala.concurrent.{ExecutionContext, Future} - -object ApplicationController extends Controller { - - var isHealthy = true - var deployInfo = "" - val applicationJsonHeader = "application/json" - - val jsonParser: BodyParser[JsValue] = controllers.s2parse.json - - val jsonText: BodyParser[String] = controllers.s2parse.jsonText - - private def badQueryExceptionResults(ex: Exception) = - Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader)) - - private def errorResults = - Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader)) - - def requestFallback(body: String): PartialFunction[Throwable, Future[Result]] = { - case e: BadQueryException => - logger.error(s"{$body}, ${e.getMessage}", e) - badQueryExceptionResults(e) - case e: Exception => - logger.error(s"${body}, ${e.getMessage}", e) - errorResults - } - - def updateHealthCheck(isHealthy: Boolean) = Action { request => - this.isHealthy = isHealthy - Ok(this.isHealthy + "\n") - } - - def healthCheck() = withHeader(parse.anyContent) { request => - if (isHealthy) Ok(deployInfo) - else NotFound - } - - def jsonResponse(json: JsValue, headers: (String, String)*) = - if (ApplicationController.isHealthy) { - Ok(json).as(applicationJsonHeader).withHeaders(headers: _*) - } else { - Result( - header = ResponseHeader(OK), - body = Enumerator(json.toString.getBytes()), - connection = HttpConnection.Close - ).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: headers: _*) - } - - def toLogMessage[A](request: Request[A], result: Result)(startedAt: Long): String = { - val duration = System.currentTimeMillis() - startedAt - val isQueryRequest = result.header.headers.contains("result_size") - val resultSize = result.header.headers.getOrElse("result_size", "-1") - - try { - val body = request.body match { - case AnyContentAsJson(jsValue) => jsValue match { - case JsString(str) => str - case _ => jsValue.toString - } - case AnyContentAsEmpty => "" - case _ => request.body.toString - } - - val str = - if (isQueryRequest) - s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}" - else - s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}" - - str - } finally { - /* pass */ - } - } - - def withHeaderAsync[A](bodyParser: BodyParser[A])(block: Request[A] => Future[Result])(implicit ex: ExecutionContext) = - Action.async(bodyParser) { request => - val startedAt = System.currentTimeMillis() - block(request).map { r => - logger.info(toLogMessage(request, r)(startedAt)) - r - } - } - - def withHeader[A](bodyParser: BodyParser[A])(block: Request[A] => Result) = - Action(bodyParser) { request: Request[A] => - val startedAt = System.currentTimeMillis() - val r = block(request) - logger.info(toLogMessage(request, r)(startedAt)) - r - } -}
