http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
 
b/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
deleted file mode 100644
index 4970399..0000000
--- 
a/s2counter_loader/src/test/scala/s2/counter/stream/ExactCounterStreamingSpec.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-package s2.counter.stream
-
-import com.kakao.s2graph.core.GraphUtil
-import com.kakao.s2graph.core.mysqls.Label
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import play.api.libs.json.Json
-import s2.config.{S2ConfigFactory, S2CounterConfig}
-import s2.counter.core.CounterFunctions.HashMapAccumulable
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
-import s2.helper.CounterAdmin
-import s2.models.{Counter, DBModel, DefaultCounterModel}
-import s2.spark.HashMapParam
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.{Failure, Success}
-
-/**
-  * Created by hsleep([email protected]) on 2015. 11. 19..
-  */
-class ExactCounterStreamingSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
-  private val master = "local[2]"
-  private val appName = "exact_counter_streaming"
-  private val batchDuration = Seconds(1)
-
-  private var sc: SparkContext = _
-  private var ssc: StreamingContext = _
-
-  val admin = new CounterAdmin(S2ConfigFactory.config)
-  val graphOp = new GraphOperation(S2ConfigFactory.config)
-  val s2config = new S2CounterConfig(S2ConfigFactory.config)
-
-  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
-  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
-
-  val service = "test"
-  val action = "test_case"
-
-  override def beforeAll(): Unit = {
-    DBModel.initialize(S2ConfigFactory.config)
-
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    ssc = new StreamingContext(conf, batchDuration)
-
-    sc = ssc.sparkContext
-
-    // create test_case label
-    com.kakao.s2graph.core.Management.createService(service, 
s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz")
-    if (Label.findByName(action, useCache = false).isEmpty) {
-      val strJs =
-        s"""
-           |{
-           |  "label": "$action",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "src",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "$action",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |  ],
-           |  "props": [
-           |  ]
-           |}
-       """.stripMargin
-      graphOp.createLabel(Json.parse(strJs))
-    }
-
-    // action
-    admin.deleteCounter(service, action).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "is_shared,relationship", useRank = 
true))
-  }
-
-  override def afterAll(): Unit = {
-    admin.deleteCounter(service, action)
-    if (ssc != null) {
-      ssc.stop()
-    }
-  }
-
-  "ExactCounter" should "update" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val data =
-      s"""
-        |1434534565675 $service        $action 70362200_94013572857366866      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
-        |1434534565675 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48255079","userIdType":"profile_id","value":"1"}
-        |1434534566220 $service        $action 51223360_94013140590929619      
{"is_shared":"false","relationship":"FE"}       
{"userId":"312383","userIdType":"profile_id","value":"1"}
-        |1434534566508 $service        $action 63808459_94013420047377826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"21968241","userIdType":"profile_id","value":"1"}
-        |1434534566210 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"6062217","userIdType":"profile_id","value":"1"}
-        |1434534566459 $service        $action 49699692_94012186431261763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"67863471","userIdType":"profile_id","value":"1"}
-        |1434534565681 $service        $action 64556827_94012311028641810      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19381218","userIdType":"profile_id","value":"1"}
-        |1434534565865 $service        $action 41814266_94012477588942163      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
-        |1434534565865 $service        $action 66697741_94007840665633458      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19268547","userIdType":"profile_id","value":"1"}
-        |1434534566142 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11917195","userIdType":"profile_id","value":"1"}
-        |1434534566077 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37709890","userIdType":"profile_id","value":"1"}
-        |1434534565938 $service        $action 40921487_94012905738975266      
{"is_shared":"false","relationship":"FE"}       
{"userId":"59869223","userIdType":"profile_id","value":"1"}
-        |1434534566033 $service        $action 64506628_93994707216829506      
{"is_shared":"false","relationship":"FE"}       
{"userId":"50375575","userIdType":"profile_id","value":"1"}
-        |1434534566451 $service        $action 40748868_94013448321919139      
{"is_shared":"false","relationship":"FE"}       
{"userId":"12249539","userIdType":"profile_id","value":"1"}
-        |1434534566669 $service        $action 64499956_94013227717457106      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
-        |1434534566669 $service        $action 66444074_94012737377133826      
{"is_shared":"false","relationship":"FE"}       
{"userId":"25167419","userIdType":"profile_id","value":"1"}
-        |1434534566318 $service        $action 64774665_94012837889027027      
{"is_shared":"true","relationship":"F"} 
{"userId":"71557816","userIdType":"profile_id","value":"1"}
-        |1434534566274 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"57931860","userIdType":"profile_id","value":"1"}
-        |1434534566659 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"19990823","userIdType":"profile_id","value":"1"}
-        |1434534566250 $service        $action 70670053_93719933175630611      
{"is_shared":"true","relationship":"F"} 
{"userId":"68897412","userIdType":"profile_id","value":"1"}
-        |1434534566402 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"15541439","userIdType":"profile_id","value":"1"}
-        |1434534566122 $service        $action 48890741_94013463616012786      
{"is_shared":"false","relationship":"FE"}       
{"userId":"48040409","userIdType":"profile_id","value":"1"}
-        |1434534566055 $service        $action 64509008_94002318232678546      
{"is_shared":"true","relationship":"F"} 
{"userId":"46532039","userIdType":"profile_id","value":"1"}
-        |1434534565994 $service        $action 66644368_94009163363033795      
{"is_shared":"false","relationship":"FE"}       
{"userId":"4143147","userIdType":"profile_id","value":"1"}
-        |1434534566448 $service        $action 64587644_93938555963733954      
{"is_shared":"false","relationship":"FE"}       
{"userId":"689042","userIdType":"profile_id","value":"1"}
-        |1434534565935 $service        $action 52812511_94012009551561315      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35509692","userIdType":"profile_id","value":"1"}
-        |1434534566544 $service        $action 70452048_94008573197583762      
{"is_shared":"false","relationship":"FE"}       
{"userId":"5172421","userIdType":"profile_id","value":"1"}
-        |1434534565929 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"33556498","userIdType":"profile_id","value":"1"}
-        |1434534566358 $service        $action 46889329_94013502934177075      
{"is_shared":"false","relationship":"FE"}       
{"userId":"8987346","userIdType":"profile_id","value":"1"}
-        |1434534566057 $service        $action 67075480_94008509166933763      
{"is_shared":"false","relationship":"FE"}       
{"userId":"35134964","userIdType":"profile_id","value":"1"}
-        |1434534566140 $service        $action 54547023_94013384964278435      
{"is_shared":"false","relationship":"FE"}       
{"userId":"11900315","userIdType":"profile_id","value":"1"}
-        |1434534566158 $service        $action 64639374_93888330176053635      
{"is_shared":"true","relationship":"F"} 
{"userId":"49996643","userIdType":"profile_id","value":"1"}
-        |1434534566025 $service        $action 67265128_94009084771192002      
{"is_shared":"false","relationship":"FE"}       
{"userId":"37801480","userIdType":"profile_id","value":"1"}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-    val resultRdd = CounterFunctions.makeExactRdd(rdd, 2)
-    val result = resultRdd.collect().toMap
-
-    //    result.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val parsed = {
-      for {
-        line <- GraphUtil.parseString(data)
-        item <- CounterEtlItem(line).toSeq
-        ev <- CounterFunctions.exactMapper(item).toSeq
-      } yield {
-        ev
-      }
-    }
-    val parsedResult = parsed.groupBy(_._1).mapValues(values => 
values.map(_._2).reduce(CounterFunctions.reduceValue[ExactQualifier, Long](_ + 
_, 0L)))
-
-    //    parsedResult.foreach { case (k, v) =>
-    //      println(k, v)
-    //    }
-
-    result should not be empty
-    result should equal (parsedResult)
-
-    val itemId = "46889329_94013502934177075"
-    val key = ExactKey(DefaultCounterModel.findByServiceAction(service, 
action).get, itemId, checkItemType = true)
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get(ExactQualifier(TimedQualifier("t", 0), Map.empty[String, 
String])) should equal (Some(6L))
-
-    exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (None)
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-    resultRdd.foreachPartition { part =>
-      CounterFunctions.updateExactCounter(part.toSeq, acc)
-    }
-
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map.empty[String, String]) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map.empty[String, Set[String]]) should be (Some(expected))
-    }
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map("is_shared" -> "false")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "is_shared.false") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"))) should be (Some(expected))
-    }
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map("relationship" -> "FE")) -> 
Map(ExactQualifier(TimedQualifier("t", 0), "relationship.FE") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("relationship" -> Set("FE"))) should be (Some(expected))
-    }
-    Option(FetchedCountsGrouped(key, Map(
-      (IntervalUnit.TOTAL, Map("is_shared" -> "false", "relationship" -> 
"FE")) -> Map(ExactQualifier(TimedQualifier("t", 0), 
"is_shared.relationship.false.FE") -> 6l)
-    ))).foreach { expected =>
-      exactCounter.getCount(policy, itemId, Seq(IntervalUnit.TOTAL), 0, 0, 
Map("is_shared" -> Set("false"), "relationship" -> Set("FE"))) should be 
(Some(expected))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala
 
b/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala
deleted file mode 100644
index 434673d..0000000
--- 
a/s2counter_loader/src/test/scala/s2/counter/stream/RankingCounterStreamingSpec.scala
+++ /dev/null
@@ -1,448 +0,0 @@
-package s2.counter.stream
-
-import com.kakao.s2graph.core.mysqls.Label
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import play.api.libs.json.Json
-import s2.config.{S2ConfigFactory, S2CounterConfig}
-import s2.counter.core.CounterFunctions.HashMapAccumulable
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v2.{ExactStorageGraph, GraphOperation, 
RankingStorageGraph}
-import s2.helper.CounterAdmin
-import s2.models.{Counter, DBModel, DefaultCounterModel}
-import s2.spark.HashMapParam
-
-import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.{Failure, Success}
-
-/**
- * Created by hsleep([email protected]) on 15. 6. 17..
- */
-class RankingCounterStreamingSpec extends FlatSpec with BeforeAndAfterAll with 
Matchers {
-  private val master = "local[2]"
-  private val appName = "ranking_counter_streaming"
-  private val batchDuration = Seconds(1)
-
-  private var sc: SparkContext = _
-  private var ssc: StreamingContext = _
-
-  val admin = new CounterAdmin(S2ConfigFactory.config)
-  val graphOp = new GraphOperation(S2ConfigFactory.config)
-  val s2config = new S2CounterConfig(S2ConfigFactory.config)
-
-  val exactCounter = new ExactCounter(S2ConfigFactory.config, new 
ExactStorageGraph(S2ConfigFactory.config))
-  val rankingCounter = new RankingCounter(S2ConfigFactory.config, new 
RankingStorageGraph(S2ConfigFactory.config))
-
-  val service = "test"
-  val action = "test_case"
-  val action_base = "test_case_base"
-  val action_rate = "test_case_rate"
-  val action_rate_threshold = "test_case_rate_threshold"
-  val action_trend = "test_case_trend"
-
-  override def beforeAll(): Unit = {
-    DBModel.initialize(S2ConfigFactory.config)
-
-    val conf = new SparkConf()
-      .setMaster(master)
-      .setAppName(appName)
-
-    ssc = new StreamingContext(conf, batchDuration)
-
-    sc = ssc.sparkContext
-
-    admin.setupCounterOnGraph()
-
-    // create test_case label
-    com.kakao.s2graph.core.Management.createService(service, 
s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz")
-    if (Label.findByName(action, useCache = false).isEmpty) {
-      val strJs =
-        s"""
-           |{
-           |  "label": "$action",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "src",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "$action",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |  ],
-           |  "props": [
-           |  ]
-           |}
-       """.stripMargin
-      graphOp.createLabel(Json.parse(strJs))
-    }
-    if (Label.findByName(action_base, useCache = false).isEmpty) {
-      val strJs =
-        s"""
-           |{
-           |  "label": "$action_base",
-           |  "srcServiceName": "$service",
-           |  "srcColumnName": "src",
-           |  "srcColumnType": "string",
-           |  "tgtServiceName": "$service",
-           |  "tgtColumnName": "$action",
-           |  "tgtColumnType": "string",
-           |  "indices": [
-           |  ],
-           |  "props": [
-           |  ]
-           |}
-       """.stripMargin
-      graphOp.createLabel(Json.parse(strJs))
-    }
-
-    // action
-    admin.deleteCounter(service, action).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-
-    // action_base
-    admin.deleteCounter(service, action_base).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action_base, 
Counter.ItemType.STRING, autoComb = true, "", useRank = true))
-    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-
-    // action_rate
-    admin.deleteCounter(service, action_rate).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action_rate, 
Counter.ItemType.STRING, autoComb = true, "gender,p1", useRank = true,
-      rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id)))
-
-    // action_rate_threshold
-    admin.deleteCounter(service, action_rate_threshold).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, 
action_rate_threshold, Counter.ItemType.STRING, autoComb = true, "gender,p1", 
useRank = true,
-      rateActionId = Some(policy.id), rateBaseId = Some(basePolicy.id), 
rateThreshold = Some(3)))
-
-    // action_trend
-    admin.deleteCounter(service, action_trend).foreach {
-      case Success(v) =>
-      case Failure(ex) =>
-        println(s"$ex")
-    }
-    admin.createCounter(Counter(useFlag = true, 2, service, action_trend, 
Counter.ItemType.STRING, autoComb = true, "p1", useRank = true,
-      rateActionId = Some(policy.id), rateBaseId = Some(policy.id)))
- }
-
-  override def afterAll(): Unit = {
-    admin.deleteCounter(service, action)
-    admin.deleteCounter(service, action_base)
-    admin.deleteCounter(service, action_rate)
-    admin.deleteCounter(service, action_rate_threshold)
-    admin.deleteCounter(service, action_trend)
-    if (ssc != null) {
-      ssc.stop()
-    }
-  }
-
-  "RankingCounterStreaming" should "update" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-//    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-
-    rankingCounter.ready(policy) should equal (true)
-    val data =
-      s"""
-         
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":3}]}
-         
|{"success":false,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-         
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-         
|{"success":true,"policyId":${policy.id},"item":"3","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-         
|{"success":true,"policyId":${policy.id},"item":"4","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val result = CounterFunctions.makeRankingRdd(rdd, 2).collect().toMap
-
-    //    result.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    result should not be empty
-    val rankKey = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
-    result should contain (rankKey -> Map(
-      "1" -> RankingValue(3, 1),
-      "3" -> RankingValue(2, 2),
-      "4" -> RankingValue(1, 1)
-    ))
-
-    val key = RankingKey(policy.id, policy.version, 
ExactQualifier(TimedQualifier("M", 1433084400000L), ""))
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get("1").get should equal (RankingValue(3, 1))
-    value.get.get("2") shouldBe empty
-    value.get.get("3").get should equal (RankingValue(2, 2))
-
-    rankingCounter.ready(policy) should equal (true)
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
-    Thread.sleep(1000)
-    val rst = rankingCounter.getTopK(key)
-
-    rst should not be empty
-//    rst.get.totalScore should equal(4f)
-    rst.get.values should contain allOf(("3", 2d), ("4", 1d), ("1", 3d))
-  }
-
-//  "rate by base" >> {
-//    val data =
-//      """
-//        
|{"success":true,"policyId":42,"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-//      """.stripMargin.trim
-//    val rdd = sc.parallelize(Seq(("", data)))
-//
-//    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2).collect()
-//    trxLogRdd.foreach { log =>
-//      CounterFunctions.rateBaseRankingMapper(log) must not be empty
-//    }
-//
-//    true must_== true
-//  }
-
-  it should "update rate ranking counter" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-    val ratePolicy = DefaultCounterModel.findByServiceAction(service, 
action_rate).get
-
-    // update base policy
-    val eq = ExactQualifier(TimedQualifier("M", 1433084400000l), "")
-    val exactKey = ExactKey(basePolicy, "1", checkItemType = true)
-
-    // check base item count
-    exactCounter.updateCount(basePolicy, Seq(
-      (exactKey, Map(eq -> 2l))
-    ))
-    Thread.sleep(1000)
-
-    // direct get
-    val baseCount = exactCounter.getCount(basePolicy, "1", 
Seq(IntervalUnit.MONTHLY), 1433084400000l, 1433084400000l, Map.empty[String, 
Set[String]])
-    baseCount should not be empty
-    baseCount.get should equal (FetchedCountsGrouped(exactKey, Map(
-      (eq.tq.q, Map.empty[String, String]) -> Map(eq-> 2l)
-    )))
-
-    // related get
-    val relatedCount = exactCounter.getRelatedCounts(basePolicy, Seq("1" -> 
Seq(eq)))
-    relatedCount should not be empty
-    relatedCount.get("1") should not be empty
-    relatedCount.get("1").get should equal (Map(eq -> 2l))
-
-    val data =
-      s"""
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"p1.1","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
-    trxLogRdd.count() should equal (data.trim.split('\n').length)
-
-    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
-    itemRankingRdd.foreach(println)
-
-    val result = CounterFunctions.rateRankingCount(itemRankingRdd, 
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
-    result.foreach(println)
-    result should have size 3
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    // rate ranking
-    val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), ""))
-    val value = result.get(key)
-
-//    println(key, value)
-
-    value should not be empty
-    value.get.get("1") should not be empty
-    value.get.get("1").get should equal (RankingValue(1, 0))
-    value.get.get("2").get should equal (RankingValue(0.25, 0))
-
-    val key2 = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), "p1.1"))
-    val value2 = result.get(key2)
-
-//    println(key2, value2)
-
-    val values = value.map(v => (key, v)).toSeq ++ value2.map(v => (key2, 
v)).toSeq
-    println(s"values: $values")
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    rankingCounter.delete(key2)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(values, acc)
-    // for update graph
-    Thread.sleep(1000)
-
-    val rst = rankingCounter.getTopK(key)
-    rst should not be empty
-    rst.get.values should equal (Seq(("1", 1d), ("2", 0.25d)))
-
-    val rst2 = rankingCounter.getTopK(key2)
-    rst2 should not be empty
-    rst2.get.values should equal (Seq(("2", 0.25d)))
-  }
-
-  it should "update rate ranking counter with threshold" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val basePolicy = DefaultCounterModel.findByServiceAction(service, 
action_base).get
-    val ratePolicy = DefaultCounterModel.findByServiceAction(service, 
action_rate_threshold).get
-
-    val data =
-      s"""
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":2}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1433084400000,"value":2,"result":4}]}
-        
|{"success":true,"policyId":${basePolicy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1433084400000,"value":2,"result":4}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
-    trxLogRdd.count() should equal (data.trim.split('\n').length)
-
-    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
-    itemRankingRdd.foreach(println)
-
-    val result = CounterFunctions.rateRankingCount(itemRankingRdd, 
2).collect().toMap.filterKeys(key => key.policyId == ratePolicy.id)
-    result.foreach(println)
-    result should have size 2
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    // rate ranking
-    val key = RankingKey(ratePolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1433084400000L), ""))
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get("1") should be (None)
-    value.get.get("2").get should equal (RankingValue(0.25, 0))
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
-    Thread.sleep(1000)
-    val rst = rankingCounter.getTopK(key)
-
-    rst should not be empty
-    rst.get.values should equal (Seq(("2", 0.25d)))
-  }
-
-  it should "update trend ranking counter" in {
-    val policy = DefaultCounterModel.findByServiceAction(service, action).get
-    val trendPolicy = DefaultCounterModel.findByServiceAction(service, 
action_trend).get
-
-    val exactKey1 = ExactKey(policy, "1", checkItemType = true)
-    val exactKey2 = ExactKey(policy, "2", checkItemType = true)
-    // update old key value
-    val tq1 = TimedQualifier("M", 1435676400000l)
-    val tq2 = TimedQualifier("M", 1427814000000l)
-    exactCounter.updateCount(policy, Seq(
-      exactKey1 -> Map(ExactQualifier(tq1.add(-1), "") -> 1l, 
ExactQualifier(tq2.add(-1), "") -> 92l)
-    ))
-    val eq1 = ExactQualifier(tq1, "")
-    val eq2 = ExactQualifier(tq2, "")
-
-    val oldCount = exactCounter.getPastCounts(policy, Seq("1" -> Seq(eq1, 
eq2), "2" -> Seq(eq1, eq1.copy(dimension = "gender.M"))))
-    oldCount should not be empty
-    oldCount.get("1").get should equal(Map(eq1 -> 1l, eq2 -> 92l))
-    oldCount.get("2") should be (None)
-
-    val data =
-      s"""
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":2}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"","ts":1435676400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"2","results":[{"interval":"M","dimension":"gender.M","ts":1435676400000,"value":1,"result":1}]}
-        
|{"success":true,"policyId":${policy.id},"item":"1","results":[{"interval":"M","dimension":"","ts":1427814000000,"value":1,"result":92}]}
-      """.stripMargin.trim
-    //    println(data)
-    val rdd = sc.parallelize(Seq(("", data)))
-
-    //    rdd.foreachPartition { part =>
-    //      part.foreach(println)
-    //    }
-
-    val trxLogRdd = CounterFunctions.makeTrxLogRdd(rdd, 2)
-    trxLogRdd.count() should equal (data.trim.split('\n').length)
-
-    val itemRankingRdd = CounterFunctions.makeItemRankingRdd(trxLogRdd, 2)
-    itemRankingRdd.foreach(println)
-
-    val result = CounterFunctions.trendRankingCount(itemRankingRdd, 
2).collect().toMap
-    result.foreach(println)
-    // dimension gender.M is ignored, because gender is not defined dimension 
in trend policy.
-    result should have size 2
-
-    val acc: HashMapAccumulable = sc.accumulable(MutableHashMap.empty[String, 
Long], "Throughput")(HashMapParam[String, Long](_ + _))
-
-    // trend ranking
-    val key = RankingKey(trendPolicy.id, 2, ExactQualifier(TimedQualifier("M", 
1435676400000L), ""))
-    val value = result.get(key)
-
-    value should not be empty
-    value.get.get("1").get should equal (RankingValue(2, 0))
-    value.get.get("2").get should equal (RankingValue(1, 0))
-
-    val key2 = RankingKey(trendPolicy.id, 2, 
ExactQualifier(TimedQualifier("M", 1427814000000L), ""))
-    val value2 = result.get(key2)
-
-    value2 should not be empty
-    value2.get.get("1").get should equal (RankingValue(1, 0))
-
-    // delete, update and get
-    rankingCounter.delete(key)
-    Thread.sleep(1000)
-    CounterFunctions.updateRankingCounter(Seq((key, value.get)), acc)
-    Thread.sleep(1000)
-    val rst = rankingCounter.getTopK(key)
-
-    rst should not be empty
-    rst.get.values should equal (Seq("1" -> 2, "2" -> 1))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_netty/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/resources/application.conf 
b/s2rest_netty/src/main/resources/application.conf
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_netty/src/main/scala/Server.scala
----------------------------------------------------------------------
diff --git a/s2rest_netty/src/main/scala/Server.scala 
b/s2rest_netty/src/main/scala/Server.scala
deleted file mode 100644
index 16477b1..0000000
--- a/s2rest_netty/src/main/scala/Server.scala
+++ /dev/null
@@ -1,202 +0,0 @@
-package com.kakao.s2graph.rest.netty
-
-import java.util.Map.Entry
-import java.util.concurrent.Executors
-import java.util.function.Consumer
-
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.Experiment
-import com.kakao.s2graph.core.rest.RestHandler.HandlerResult
-import com.kakao.s2graph.core.rest._
-import com.kakao.s2graph.core.utils.Extensions._
-import com.kakao.s2graph.core.utils.logger
-import com.typesafe.config.ConfigFactory
-import io.netty.bootstrap.ServerBootstrap
-import io.netty.buffer.{ByteBuf, Unpooled}
-import io.netty.channel._
-import io.netty.channel.nio.NioEventLoopGroup
-import io.netty.channel.socket.SocketChannel
-import io.netty.channel.socket.nio.NioServerSocketChannel
-import io.netty.handler.codec.http.HttpHeaders._
-import io.netty.handler.codec.http._
-import io.netty.handler.logging.{LogLevel, LoggingHandler}
-import io.netty.util.CharsetUtil
-import play.api.libs.json._
-
-import scala.collection.mutable
-import scala.concurrent.ExecutionContext
-import scala.io.Source
-import scala.util.{Failure, Success, Try}
-
-class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) 
extends SimpleChannelInboundHandler[FullHttpRequest] with JSONParser {
-  val ApplicationJson = "application/json"
-
-  val Ok = HttpResponseStatus.OK
-  val CloseOpt = Option(ChannelFutureListener.CLOSE)
-  val BadRequest = HttpResponseStatus.BAD_REQUEST
-  val BadGateway = HttpResponseStatus.BAD_GATEWAY
-  val NotFound = HttpResponseStatus.NOT_FOUND
-  val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR
-
-  def badRoute(ctx: ChannelHandlerContext) =
-    simpleResponse(ctx, BadGateway, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
-
-  def simpleResponse(ctx: ChannelHandlerContext,
-                     httpResponseStatus: HttpResponseStatus,
-                     byteBufOpt: Option[ByteBuf] = None,
-                     headers: Seq[(String, String)] = Nil,
-                     channelFutureListenerOpt: Option[ChannelFutureListener] = 
None): Unit = {
-
-    val res: FullHttpResponse = byteBufOpt match {
-      case None => new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
httpResponseStatus)
-      case Some(byteBuf) =>
-        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, 
byteBuf)
-    }
-
-    headers.foreach { case (k, v) => res.headers().set(k, v) }
-    val channelFuture = ctx.writeAndFlush(res)
-
-    channelFutureListenerOpt match {
-      case None =>
-      case Some(listener) => channelFuture.addListener(listener)
-    }
-  }
-
-  def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, 
requestBody: JsValue, result: HandlerResult, startedAt: Long) = {
-    var closeOpt = CloseOpt
-    var headers = mutable.ArrayBuilder.make[(String, String)]
-
-    headers += (Names.CONTENT_TYPE -> ApplicationJson)
-    result.headers.foreach(headers += _)
-
-    if (HttpHeaders.isKeepAlive(req)) {
-      headers += (Names.CONNECTION -> HttpHeaders.Values.KEEP_ALIVE)
-      closeOpt = None
-    }
-
-    result.body onComplete {
-      case Success(json) =>
-        val duration = System.currentTimeMillis() - startedAt
-
-        val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 
${s2rest.calcSize(json)} ${requestBody}"
-        logger.info(log)
-
-        val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, 
CharsetUtil.UTF_8)
-
-        headers += (Names.CONTENT_LENGTH -> buf.readableBytes().toString)
-
-        simpleResponse(ctx, Ok, byteBufOpt = Option(buf), 
channelFutureListenerOpt = closeOpt, headers = headers.result())
-      case Failure(ex) => ex match {
-        case e: BadQueryException =>
-          logger.error(s"{$requestBody}, ${e.getMessage}", e)
-          val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, 
CharsetUtil.UTF_8)
-          simpleResponse(ctx, Ok, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
-        case e: Exception =>
-          logger.error(s"${requestBody}, ${e.getMessage}", e)
-          val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8)
-          simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
-      }
-    }
-  }
-
-  override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): 
Unit = {
-    val uri = req.getUri
-    val startedAt = System.currentTimeMillis()
-
-    req.getMethod match {
-      case HttpMethod.GET =>
-        uri match {
-          case "/health_check.html" =>
-            if (NettyServer.isHealthy) {
-              val healthCheckMsg = 
Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
-              simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
-            } else {
-              simpleResponse(ctx, NotFound, channelFutureListenerOpt = 
CloseOpt)
-            }
-
-          case s if s.startsWith("/graphs/getEdge/") =>
-            // src, tgt, label, dir
-            val Array(srcId, tgtId, labelName, direction) = 
s.split("/").takeRight(4)
-            val params = Json.arr(Json.obj("label" -> labelName, "direction" 
-> direction, "from" -> srcId, "to" -> tgtId))
-            val result = s2rest.checkEdges(params)
-            toResponse(ctx, req, params, result, startedAt)
-          case _ => badRoute(ctx)
-        }
-
-      case HttpMethod.PUT =>
-        if (uri.startsWith("/health_check/")) {
-          val newHealthCheck = uri.split("/").last.toBoolean
-          NettyServer.isHealthy = newHealthCheck
-          val newHealthCheckMsg = 
Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8)
-          simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
-        } else badRoute(ctx)
-
-      case HttpMethod.POST =>
-        val body = req.content.toString(CharsetUtil.UTF_8)
-
-        val result = s2rest.doPost(uri, body, 
Option(req.headers().get(Experiment.impressionKey)))
-        toResponse(ctx, req, Json.parse(body), result, startedAt)
-
-      case _ =>
-        simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
-    }
-  }
-
-  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
-    cause.printStackTrace()
-    logger.error(s"exception on query.", cause)
-    simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
-  }
-}
-
-// Simple http server
-object NettyServer extends App {
-  /** should be same with Boostrap.onStart on play */
-
-  val numOfThread = Runtime.getRuntime.availableProcessors()
-  val threadPool = Executors.newFixedThreadPool(numOfThread)
-  val ec = ExecutionContext.fromExecutor(threadPool)
-
-  val config = ConfigFactory.load()
-  val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get
-
-  // init s2graph with config
-  val s2graph = new Graph(config)(ec)
-  val rest = new RestHandler(s2graph)(ec)
-
-  val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover 
{ case _ => "release info not found\n" }.get
-  var isHealthy = config.getBooleanWithFallback("app.health.on", true)
-
-  logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
-
-  // Configure the server.
-  val bossGroup: EventLoopGroup = new NioEventLoopGroup(1)
-  val workerGroup: EventLoopGroup = new NioEventLoopGroup()
-
-  try {
-    val b: ServerBootstrap = new ServerBootstrap()
-    b.option(ChannelOption.SO_BACKLOG, Int.box(2048))
-
-    b.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
-      .handler(new LoggingHandler(LogLevel.INFO))
-      .childHandler(new ChannelInitializer[SocketChannel] {
-        override def initChannel(ch: SocketChannel) {
-          val p = ch.pipeline()
-          p.addLast(new HttpServerCodec())
-          p.addLast(new HttpObjectAggregator(65536))
-          p.addLast(new S2RestHandler(rest)(ec))
-        }
-      })
-
-    logger.info(s"Listening for HTTP on /0.0.0.0:$port")
-    val ch: Channel = b.bind(port).sync().channel()
-    ch.closeFuture().sync()
-
-  } finally {
-    bossGroup.shutdownGracefully()
-    workerGroup.shutdownGracefully()
-    s2graph.shutdown()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala 
b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
new file mode 100644
index 0000000..6669c9c
--- /dev/null
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -0,0 +1,200 @@
+package org.apache.s2graph.rest.netty
+
+import java.util.concurrent.Executors
+
+import com.typesafe.config.ConfigFactory
+import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.{ByteBuf, Unpooled}
+import io.netty.channel._
+import io.netty.channel.nio.NioEventLoopGroup
+import io.netty.channel.socket.SocketChannel
+import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.handler.codec.http.HttpHeaders._
+import io.netty.handler.codec.http._
+import io.netty.handler.logging.{LogLevel, LoggingHandler}
+import io.netty.util.CharsetUtil
+import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.mysqls.Experiment
+import org.apache.s2graph.core.rest.RestHandler
+import org.apache.s2graph.core.rest.RestHandler.HandlerResult
+import org.apache.s2graph.core.utils.Extensions._
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{Graph, JSONParser, PostProcess}
+import play.api.libs.json._
+
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+import scala.io.Source
+import scala.util.{Failure, Success, Try}
+
+class S2RestHandler(s2rest: RestHandler)(implicit ec: ExecutionContext) 
extends SimpleChannelInboundHandler[FullHttpRequest] with JSONParser {
+  val ApplicationJson = "application/json"
+
+  val Ok = HttpResponseStatus.OK
+  val CloseOpt = Option(ChannelFutureListener.CLOSE)
+  val BadRequest = HttpResponseStatus.BAD_REQUEST
+  val BadGateway = HttpResponseStatus.BAD_GATEWAY
+  val NotFound = HttpResponseStatus.NOT_FOUND
+  val InternalServerError = HttpResponseStatus.INTERNAL_SERVER_ERROR
+
+  def badRoute(ctx: ChannelHandlerContext) =
+    simpleResponse(ctx, BadGateway, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
+
+  def simpleResponse(ctx: ChannelHandlerContext,
+                     httpResponseStatus: HttpResponseStatus,
+                     byteBufOpt: Option[ByteBuf] = None,
+                     headers: Seq[(String, String)] = Nil,
+                     channelFutureListenerOpt: Option[ChannelFutureListener] = 
None): Unit = {
+
+    val res: FullHttpResponse = byteBufOpt match {
+      case None => new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, 
httpResponseStatus)
+      case Some(byteBuf) =>
+        new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, 
byteBuf)
+    }
+
+    headers.foreach { case (k, v) => res.headers().set(k, v) }
+    val channelFuture = ctx.writeAndFlush(res)
+
+    channelFutureListenerOpt match {
+      case None =>
+      case Some(listener) => channelFuture.addListener(listener)
+    }
+  }
+
+  def toResponse(ctx: ChannelHandlerContext, req: FullHttpRequest, 
requestBody: JsValue, result: HandlerResult, startedAt: Long) = {
+    var closeOpt = CloseOpt
+    var headers = mutable.ArrayBuilder.make[(String, String)]
+
+    headers += (Names.CONTENT_TYPE -> ApplicationJson)
+    result.headers.foreach(headers += _)
+
+    if (HttpHeaders.isKeepAlive(req)) {
+      headers += (Names.CONNECTION -> HttpHeaders.Values.KEEP_ALIVE)
+      closeOpt = None
+    }
+
+    result.body onComplete {
+      case Success(json) =>
+        val duration = System.currentTimeMillis() - startedAt
+
+        val log = s"${req.getMethod} ${req.getUri} took ${duration} ms 200 
${s2rest.calcSize(json)} ${requestBody}"
+        logger.info(log)
+
+        val buf: ByteBuf = Unpooled.copiedBuffer(json.toString, 
CharsetUtil.UTF_8)
+
+        headers += (Names.CONTENT_LENGTH -> buf.readableBytes().toString)
+
+        simpleResponse(ctx, Ok, byteBufOpt = Option(buf), 
channelFutureListenerOpt = closeOpt, headers = headers.result())
+      case Failure(ex) => ex match {
+        case e: BadQueryException =>
+          logger.error(s"{$requestBody}, ${e.getMessage}", e)
+          val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.badRequestResults(e).toString, 
CharsetUtil.UTF_8)
+          simpleResponse(ctx, Ok, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
+        case e: Exception =>
+          logger.error(s"${requestBody}, ${e.getMessage}", e)
+          val buf: ByteBuf = 
Unpooled.copiedBuffer(PostProcess.emptyResults.toString, CharsetUtil.UTF_8)
+          simpleResponse(ctx, InternalServerError, byteBufOpt = Option(buf), 
channelFutureListenerOpt = CloseOpt, headers = headers.result())
+      }
+    }
+  }
+
+  override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): 
Unit = {
+    val uri = req.getUri
+    val startedAt = System.currentTimeMillis()
+
+    req.getMethod match {
+      case HttpMethod.GET =>
+        uri match {
+          case "/health_check.html" =>
+            if (NettyServer.isHealthy) {
+              val healthCheckMsg = 
Unpooled.copiedBuffer(NettyServer.deployInfo, CharsetUtil.UTF_8)
+              simpleResponse(ctx, Ok, byteBufOpt = Option(healthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
+            } else {
+              simpleResponse(ctx, NotFound, channelFutureListenerOpt = 
CloseOpt)
+            }
+
+          case s if s.startsWith("/graphs/getEdge/") =>
+            // src, tgt, label, dir
+            val Array(srcId, tgtId, labelName, direction) = 
s.split("/").takeRight(4)
+            val params = Json.arr(Json.obj("label" -> labelName, "direction" 
-> direction, "from" -> srcId, "to" -> tgtId))
+            val result = s2rest.checkEdges(params)
+            toResponse(ctx, req, params, result, startedAt)
+          case _ => badRoute(ctx)
+        }
+
+      case HttpMethod.PUT =>
+        if (uri.startsWith("/health_check/")) {
+          val newHealthCheck = uri.split("/").last.toBoolean
+          NettyServer.isHealthy = newHealthCheck
+          val newHealthCheckMsg = 
Unpooled.copiedBuffer(NettyServer.isHealthy.toString, CharsetUtil.UTF_8)
+          simpleResponse(ctx, Ok, byteBufOpt = Option(newHealthCheckMsg), 
channelFutureListenerOpt = CloseOpt)
+        } else badRoute(ctx)
+
+      case HttpMethod.POST =>
+        val body = req.content.toString(CharsetUtil.UTF_8)
+
+        val result = s2rest.doPost(uri, body, 
Option(req.headers().get(Experiment.impressionKey)))
+        toResponse(ctx, req, Json.parse(body), result, startedAt)
+
+      case _ =>
+        simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
+    }
+  }
+
+  override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
+    cause.printStackTrace()
+    logger.error(s"exception on query.", cause)
+    simpleResponse(ctx, BadRequest, byteBufOpt = None, 
channelFutureListenerOpt = CloseOpt)
+  }
+}
+
+// Simple http server
+object NettyServer extends App {
+  /** should be same with Boostrap.onStart on play */
+
+  val numOfThread = Runtime.getRuntime.availableProcessors()
+  val threadPool = Executors.newFixedThreadPool(numOfThread)
+  val ec = ExecutionContext.fromExecutor(threadPool)
+
+  val config = ConfigFactory.load()
+  val port = Try(config.getInt("http.port")).recover { case _ => 9000 }.get
+
+  // init s2graph with config
+  val s2graph = new Graph(config)(ec)
+  val rest = new RestHandler(s2graph)(ec)
+
+  val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover 
{ case _ => "release info not found\n" }.get
+  var isHealthy = config.getBooleanWithFallback("app.health.on", true)
+
+  logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
+
+  // Configure the server.
+  val bossGroup: EventLoopGroup = new NioEventLoopGroup(1)
+  val workerGroup: EventLoopGroup = new NioEventLoopGroup()
+
+  try {
+    val b: ServerBootstrap = new ServerBootstrap()
+    b.option(ChannelOption.SO_BACKLOG, Int.box(2048))
+
+    b.group(bossGroup, workerGroup).channel(classOf[NioServerSocketChannel])
+      .handler(new LoggingHandler(LogLevel.INFO))
+      .childHandler(new ChannelInitializer[SocketChannel] {
+        override def initChannel(ch: SocketChannel) {
+          val p = ch.pipeline()
+          p.addLast(new HttpServerCodec())
+          p.addLast(new HttpObjectAggregator(65536))
+          p.addLast(new S2RestHandler(rest)(ec))
+        }
+      })
+
+    logger.info(s"Listening for HTTP on /0.0.0.0:$port")
+    val ch: Channel = b.bind(port).sync().channel()
+    ch.closeFuture().sync()
+
+  } finally {
+    bossGroup.shutdownGracefully()
+    workerGroup.shutdownGracefully()
+    s2graph.shutdown()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala
deleted file mode 100644
index 6ce3ac4..0000000
--- a/s2rest_play/app/Bootstrap.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-package com.kakao.s2graph.rest
-
-import java.util.concurrent.Executors
-
-import actors.QueueActor
-import com.kakao.s2graph.core.rest._
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph}
-import config.Config
-import controllers.{ApplicationController}
-import play.api.Application
-import play.api.mvc.{WithFilters, _}
-import play.filters.gzip.GzipFilter
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.io.Source
-import scala.util.Try
-
-object Global extends WithFilters(new GzipFilter()) {
-  var s2graph: Graph = _
-  var storageManagement: Management = _
-  var s2parser: RequestParser = _
-  var s2rest: RestHandler = _
-
-  // Application entry point
-  override def onStart(app: Application) {
-    ApplicationController.isHealthy = false
-
-    val numOfThread = Runtime.getRuntime.availableProcessors()
-    val threadPool = Executors.newFixedThreadPool(numOfThread)
-    val ec = ExecutionContext.fromExecutor(threadPool)
-
-    val config = Config.conf.underlying
-
-    // init s2graph with config
-    s2graph = new Graph(config)(ec)
-    storageManagement = new Management(s2graph)
-    s2parser = new RequestParser(s2graph.config) // merged config
-    s2rest = new RestHandler(s2graph)(ec)
-
-    QueueActor.init(s2graph)
-
-    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
-      ExceptionHandler.apply(config)
-    }
-
-    val defaultHealthOn = 
Config.conf.getBoolean("app.health.on").getOrElse(true)
-    ApplicationController.deployInfo = 
Try(Source.fromFile("./release_info").mkString("")).recover { case _ => 
"release info not found\n" }.get
-
-    ApplicationController.isHealthy = defaultHealthOn
-    logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
-  }
-
-  override def onStop(app: Application) {
-    QueueActor.shutdown()
-
-    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
-      ExceptionHandler.shutdown()
-    }
-
-    /**
-     * shutdown hbase client for flush buffers.
-     */
-    s2graph.shutdown()
-  }
-
-  override def onError(request: RequestHeader, ex: Throwable): Future[Result] 
= {
-    logger.error(s"onError => ip:${request.remoteAddress}, 
request:${request}", ex)
-    Future.successful(Results.InternalServerError)
-  }
-
-  override def onHandlerNotFound(request: RequestHeader): Future[Result] = {
-    logger.error(s"onHandlerNotFound => ip:${request.remoteAddress}, 
request:${request}")
-    Future.successful(Results.NotFound)
-  }
-
-  override def onBadRequest(request: RequestHeader, error: String): 
Future[Result] = {
-    logger.error(s"onBadRequest => ip:${request.remoteAddress}, 
request:$request, error:$error")
-    Future.successful(Results.BadRequest(error))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/actors/QueueActor.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/actors/QueueActor.scala 
b/s2rest_play/app/actors/QueueActor.scala
deleted file mode 100644
index 74bc65d..0000000
--- a/s2rest_play/app/actors/QueueActor.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-package actors
-
-import java.util.concurrent.TimeUnit
-
-import actors.Protocol.FlushAll
-import akka.actor._
-import com.kakao.s2graph.core.ExceptionHandler._
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.utils.logger
-import config.Config
-import play.api.Play.current
-import play.api.libs.concurrent.Akka
-
-import scala.collection.mutable
-import scala.concurrent.duration.Duration
-
-/**
- * Created by shon on 9/2/15.
- */
-object Protocol {
-
-  case object Flush
-
-  case object FlushAll
-
-}
-
-object QueueActor {
-  /** we are throttling down here so fixed number of actor to constant */
-  var router: ActorRef = _
-
-  //    Akka.system.actorOf(props(), name = "queueActor")
-  def init(s2: Graph) = {
-    router = Akka.system.actorOf(props(s2))
-  }
-
-  def shutdown() = {
-    router ! FlushAll
-    Akka.system.shutdown()
-    Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2)
-  }
-
-  def props(s2: Graph): Props = Props(classOf[QueueActor], s2)
-}
-
-class QueueActor(s2: Graph) extends Actor with ActorLogging {
-
-  import Protocol._
-
-  implicit val ec = context.system.dispatcher
-  //  logger.error(s"QueueActor: $self")
-  val queue = mutable.Queue.empty[GraphElement]
-  var queueSize = 0L
-  val maxQueueSize = Config.LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE
-  val timeUnitInMillis = 10
-  val rateLimitTimeStep = 1000 / timeUnitInMillis
-  val rateLimit = Config.LOCAL_QUEUE_ACTOR_RATE_LIMIT / rateLimitTimeStep
-
-
-  context.system.scheduler.schedule(Duration.Zero, Duration(timeUnitInMillis, 
TimeUnit.MILLISECONDS), self, Flush)
-
-  override def receive: Receive = {
-    case element: GraphElement =>
-
-      if (queueSize > maxQueueSize) {
-        ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, 
element, None))
-      } else {
-        queueSize += 1L
-        queue.enqueue(element)
-      }
-
-    case Flush =>
-      val elementsToFlush =
-        if (queue.size < rateLimit) queue.dequeueAll(_ => true)
-        else (0 until rateLimit).map(_ => queue.dequeue())
-
-      val flushSize = elementsToFlush.size
-
-      queueSize -= elementsToFlush.length
-      s2.mutateElements(elementsToFlush)
-
-      if (flushSize > 0) {
-        logger.info(s"flush: $flushSize, $queueSize")
-      }
-
-    case FlushAll =>
-      s2.mutateElements(queue)
-      context.stop(self)
-
-    case _ => logger.error("unknown protocol")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/config/Config.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/config/Config.scala 
b/s2rest_play/app/config/Config.scala
deleted file mode 100644
index 98b87c5..0000000
--- a/s2rest_play/app/config/Config.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-package config
-
-import play.api.Play
-
-object Config {
-  // HBASE
-  lazy val HBASE_ZOOKEEPER_QUORUM = 
conf.getString("hbase.zookeeper.quorum").getOrElse("localhost")
-
-
-  // HBASE CLIENT
-  lazy val ASYNC_HBASE_CLIENT_FLUSH_INTERVAL = 
conf.getInt("async.hbase.client.flush.interval").getOrElse(1000).toShort
-  lazy val RPC_TIMEOUT = 
conf.getInt("hbase.client.operation.timeout").getOrElse(1000)
-  lazy val MAX_ATTEMPT = 
conf.getInt("hbase.client.operation.maxAttempt").getOrElse(3)
-
-  // PHASE
-  lazy val PHASE = conf.getString("phase").getOrElse("dev")
-  lazy val conf = Play.current.configuration
-
-  // CACHE
-  lazy val CACHE_TTL_SECONDS = conf.getInt("cache.ttl.seconds").getOrElse(600)
-  lazy val CACHE_MAX_SIZE = conf.getInt("cache.max.size").getOrElse(10000)
-
-  //KAFKA
-  lazy val KAFKA_METADATA_BROKER_LIST = 
conf.getString("kafka.metadata.broker.list").getOrElse("localhost")
-  lazy val KAFKA_PRODUCER_POOL_SIZE = 
conf.getInt("kafka.producer.pool.size").getOrElse(0)
-  lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}"
-  lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async"
-  lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed"
-
-  // is query or write
-  lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true)
-  lazy val IS_WRITE_SERVER = conf.getBoolean("is.write.server").getOrElse(true)
-
-
-  // query limit per step
-  lazy val QUERY_HARD_LIMIT = conf.getInt("query.hard.limit").getOrElse(300)
-
-  // local queue actor
-  lazy val LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE = 
conf.getInt("local.queue.actor.max.queue.size").getOrElse(10000)
-  lazy val LOCAL_QUEUE_ACTOR_RATE_LIMIT = 
conf.getInt("local.queue.actor.rate.limit").getOrElse(1000)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/config/CounterConfig.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/config/CounterConfig.scala 
b/s2rest_play/app/config/CounterConfig.scala
deleted file mode 100644
index 2569d55..0000000
--- a/s2rest_play/app/config/CounterConfig.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package config
-
-/**
- * Created by hsleep([email protected]) on 15. 9. 3..
- */
-object CounterConfig {
-  // kafka
-  lazy val KAFKA_TOPIC_COUNTER = s"s2counter-${Config.PHASE}"
-  lazy val KAFKA_TOPIC_COUNTER_TRX = s"s2counter-trx-${Config.PHASE}"
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/AdminController.scala 
b/s2rest_play/app/controllers/AdminController.scala
deleted file mode 100644
index bb73c40..0000000
--- a/s2rest_play/app/controllers/AdminController.scala
+++ /dev/null
@@ -1,424 +0,0 @@
-package controllers
-
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.rest.RequestParser
-import com.kakao.s2graph.core.utils.logger
-import play.api.mvc
-import play.api.mvc.{Action, Controller}
-import play.api.libs.json._
-import play.api.libs.functional.syntax._
-
-import scala.util.{Failure, Success, Try}
-
-object AdminController extends Controller {
-
-  import ApplicationController._
-  private val management: Management = 
com.kakao.s2graph.rest.Global.storageManagement
-  private val requestParser: RequestParser = 
com.kakao.s2graph.rest.Global.s2parser
-
-  /**
-   * admin message formatter
-   * @tparam T
-   */
-  trait AdminMessageFormatter[T] {
-    def toJson(msg: T): JsValue
-  }
-
-  object AdminMessageFormatter {
-    implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] {
-      def toJson(js: T) = js
-    }
-
-    implicit val stringToJson = new AdminMessageFormatter[String] {
-      def toJson(js: String) = Json.obj("message" -> js)
-    }
-  }
-
-  def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: 
T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    f(formatter.toJson(message))
-  }
-
-  /**
-   * ok response
-   * @param message
-   * @tparam T
-   * @return
-   */
-  def ok[T: AdminMessageFormatter](message: T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    Ok(formatter.toJson(message)).as(applicationJsonHeader)
-  }
-
-  /**
-   * bad request response
-   * @param message
-   * @tparam T
-   * @return
-   */
-  def bad[T: AdminMessageFormatter](message: T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    BadRequest(formatter.toJson(message)).as(applicationJsonHeader)
-  }
-
-  /**
-   * not found response
-   * @param message
-   * @tparam T
-   * @return
-   */
-  def notFound[T: AdminMessageFormatter](message: T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    NotFound(formatter.toJson(message)).as(applicationJsonHeader)
-  }
-
-  private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: 
Try[T])(callback: T => R): mvc.Result = res match {
-    case Success(m) =>
-      val ret = callback(m)
-      logger.info(ret.toString)
-      ok(ret)
-    case Failure(error) =>
-      logger.error(error.getMessage, error)
-      error match {
-        case JsResultException(e) => bad(JsError.toFlatJson(e))
-        case _ => bad(error.getMessage)
-      }
-  }
-
-  def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T 
=> R): mvc.Result = res match {
-    case Some(m) => ok(callback(m))
-    case None => notFound("not found")
-  }
-
-  /**
-   * load all model cache
-   * @return
-   */
-  def loadCache() = Action { request =>
-    val startTs = System.currentTimeMillis()
-
-    if (!ApplicationController.isHealthy) {
-      loadCacheInner()
-    }
-
-    ok(s"${System.currentTimeMillis() - startTs}")
-  }
-
-  def loadCacheInner() = {
-    Service.findAll()
-    ServiceColumn.findAll()
-    Label.findAll()
-    LabelMeta.findAll()
-    LabelIndex.findAll()
-    ColumnMeta.findAll()
-  }
-
-  /**
-   * read
-   */
-
-  /**
-   * get service info
-   * @param serviceName
-   * @return
-   */
-  def getService(serviceName: String) = Action { request =>
-    val serviceOpt = Management.findService(serviceName)
-    optionResponse(serviceOpt)(_.toJson)
-  }
-
-  /**
-   * get label info
-   * @param labelName
-   * @return
-   */
-  def getLabel(labelName: String) = Action { request =>
-    val labelOpt = Management.findLabel(labelName)
-    optionResponse(labelOpt)(_.toJson)
-  }
-
-  /**
-   * get all labels of service
-   * @param serviceName
-   * @return
-   */
-  def getLabels(serviceName: String) = Action { request =>
-    Service.findByName(serviceName) match {
-      case None => notFound(s"Service $serviceName not found")
-      case Some(service) =>
-        val src = Label.findBySrcServiceId(service.id.get)
-        val tgt = Label.findByTgtServiceId(service.id.get)
-
-        ok(Json.obj("from" -> src.map(_.toJson), "to" -> tgt.map(_.toJson)))
-    }
-  }
-
-  /**
-   * get service columns
-   * @param serviceName
-   * @param columnName
-   * @return
-   */
-  def getServiceColumn(serviceName: String, columnName: String) = Action { 
request =>
-    val serviceColumnOpt = for {
-      service <- Service.findByName(serviceName)
-      serviceColumn <- ServiceColumn.find(service.id.get, columnName, useCache 
= false)
-    } yield serviceColumn
-
-    optionResponse(serviceColumnOpt)(_.toJson)
-  }
-
-  /**
-   * create
-   */
-
-  /**
-   * create service
-   * @return
-   */
-  def createService() = Action(parse.json) { request =>
-    val serviceTry = createServiceInner(request.body)
-    tryResponse(serviceTry)(_.toJson)
-  }
-
-
-  def createServiceInner(jsValue: JsValue) = {
-    val (serviceName, cluster, tableName, preSplitSize, ttl, 
compressionAlgorithm) = requestParser.toServiceElements(jsValue)
-    management.createService(serviceName, cluster, tableName, preSplitSize, 
ttl, compressionAlgorithm)
-  }
-
-  /**
-   * create label
-   * @return
-   */
-  def createLabel() = Action(parse.json) { request =>
-    val ret = createLabelInner(request.body)
-    tryResponse(ret)(_.toJson)
-  }
-
-
-  def createLabelInner(json: JsValue) = for {
-    labelArgs <- requestParser.toLabelElements(json)
-    label <- (management.createLabel _).tupled(labelArgs)
-  } yield label
-
-  /**
-   * add index
-   * @return
-   */
-  def addIndex() = Action(parse.json) { request =>
-    val ret = addIndexInner(request.body)
-    tryResponse(ret)(_.label + " is updated")
-  }
-
-  def addIndexInner(json: JsValue) = for {
-    (labelName, indices) <- requestParser.toIndexElements(json)
-    label <- Management.addIndex(labelName, indices)
-  } yield label
-
-  /**
-   * create service column
-   * @return
-   */
-  def createServiceColumn() = Action(parse.json) { request =>
-    val serviceColumnTry = createServiceColumnInner(request.body)
-    tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => 
Json.obj("metas" -> columns.map(_.toJson)) }
-  }
-
-  def createServiceColumnInner(jsValue: JsValue) = for {
-    (serviceName, columnName, columnType, props) <- 
requestParser.toServiceColumnElements(jsValue)
-    serviceColumn <- Management.createServiceColumn(serviceName, columnName, 
columnType, props)
-  } yield serviceColumn
-
-  /**
-   * delete
-   */
-
-  /**
-   * delete label
-   * @param labelName
-   * @return
-   */
-  def deleteLabel(labelName: String) = Action { request =>
-    val deleteLabelTry = deleteLabelInner(labelName)
-    tryResponse(deleteLabelTry)(labelName => labelName + " is deleted")
-  }
-
-  def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName)
-
-  /**
-   * delete servieColumn
-   * @param serviceName
-   * @param columnName
-   * @return
-   */
-  def deleteServiceColumn(serviceName: String, columnName: String) = Action { 
request =>
-    val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName)
-    tryResponse(serviceColumnTry)(columnName => columnName + " is deleted")
-  }
-
-  def deleteServiceColumnInner(serviceName: String, columnName: String) =
-    Management.deleteColumn(serviceName, columnName)
-
-  /**
-   * update
-   */
-
-  /**
-   * add Prop to label
-   * @param labelName
-   * @return
-   */
-  def addProp(labelName: String) = Action(parse.json) { request =>
-    val labelMetaTry = addPropInner(labelName, request.body)
-    tryResponse(labelMetaTry)(_.toJson)
-  }
-
-  def addPropInner(labelName: String, js: JsValue) = for {
-    prop <- requestParser.toPropElements(js)
-    labelMeta <- Management.addProp(labelName, prop)
-  } yield labelMeta
-
-  /**
-   * add prop to serviceColumn
-   * @param serviceName
-   * @param columnName
-   * @return
-   */
-  def addServiceColumnProp(serviceName: String, columnName: String) = 
Action(parse.json) { request =>
-    addServiceColumnPropInner(serviceName, columnName)(request.body) match {
-      case None => bad(s"can`t find service with $serviceName or can`t find 
serviceColumn with $columnName")
-      case Some(m) => Ok(m.toJson).as(applicationJsonHeader)
-    }
-  }
-
-  def addServiceColumnPropInner(serviceName: String, columnName: String)(js: 
JsValue) = {
-    for {
-      service <- Service.findByName(serviceName)
-      serviceColumn <- ServiceColumn.find(service.id.get, columnName)
-      prop <- requestParser.toPropElements(js).toOption
-    } yield {
-      ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, 
prop.defaultValue)
-    }
-  }
-
-  /**
-   * add props to serviceColumn
-   * @param serviecName
-   * @param columnName
-   * @return
-   */
-  def addServiceColumnProps(serviecName: String, columnName: String) = 
Action(parse.json) { request =>
-    val jsObjs = 
request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject])
-    val newProps = for {
-      js <- jsObjs
-      newProp <- addServiceColumnPropInner(serviecName, columnName)(js)
-    } yield newProp
-    ok(s"${newProps.size} is added.")
-  }
-
-  /**
-   * copy label
-   * @param oldLabelName
-   * @param newLabelName
-   * @return
-   */
-  def copyLabel(oldLabelName: String, newLabelName: String) = Action { request 
=>
-    val copyTry = management.copyLabel(oldLabelName, newLabelName, 
Some(newLabelName))
-    tryResponse(copyTry)(_.label + "created")
-  }
-
-  /**
-   * rename label
-   * @param oldLabelName
-   * @param newLabelName
-   * @return
-   */
-  def renameLabel(oldLabelName: String, newLabelName: String) = Action { 
request =>
-    Label.findByName(oldLabelName) match {
-      case None => NotFound.as(applicationJsonHeader)
-      case Some(label) =>
-        Management.updateLabelName(oldLabelName, newLabelName)
-        ok(s"Label was updated")
-    }
-  }
-
-  /**
-   * update HTable for a label
-   * @param labelName
-   * @param newHTableName
-   * @return
-   */
-  def updateHTable(labelName: String, newHTableName: String) = Action { 
request =>
-    val updateTry = Management.updateHTable(labelName, newHTableName)
-    tryResponse(updateTry)(_.toString + " label(s) updated.")
-  }
-
-
-  case class HTableParams(cluster: String, hTableName: String,
-    preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: 
Option[String]) {
-
-    override def toString(): String = {
-      s"""HtableParams
-         |-- cluster : $cluster
-         |-- hTableName : $hTableName
-         |-- preSplitSize : $preSplitSize
-         |-- hTableTTL : $hTableTTL
-         |-- compressionAlgorithm : $compressionAlgorithm
-         |""".stripMargin
-    }
-  }
-
-  implicit object HTableParamsJsonConverter extends Format[HTableParams] {
-    def reads(json: JsValue): JsResult[HTableParams] = (
-    (__ \ "cluster").read[String] and
-    (__ \ "hTableName").read[String] and
-    (__ \ "preSplitSize").read[Int] and
-    (__ \ "hTableTTL").readNullable[Int] and
-      (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply 
_).reads(json)
-
-    def writes(o: HTableParams): JsValue = Json.obj(
-      "cluster" -> o.cluster,
-      "hTableName" -> o.hTableName,
-      "preSplitSize" -> o.preSplitSize,
-      "hTableTTL" -> o.hTableTTL,
-      "compressionAlgorithm" -> o.compressionAlgorithm
-    )
-  }
-
-  implicit object JsErrorJsonWriter extends Writes[JsError] {
-    def writes(o: JsError): JsValue = Json.obj(
-      "errors" -> JsArray(
-        o.errors.map {
-          case (path, validationErrors) => Json.obj(
-            "path" -> Json.toJson(path.toString()),
-            "validationErrors" -> JsArray(validationErrors.map(validationError 
=> Json.obj(
-              "message" -> JsString(validationError.message),
-              "args" -> JsArray(validationError.args.map(_ match {
-                case x: Int => JsNumber(x)
-                case x => JsString(x.toString)
-              }))
-            )))
-          )
-        }
-      )
-    )
-  }
-
-  def createHTable() = Action { request =>
-
-    //    Management.createTable(cluster, hTableName, List("e", "v"), 
preSplitSize, hTableTTL, compressionAlgorithm)
-    request.body.asJson.map(_.validate[HTableParams] match {
-      case JsSuccess(hTableParams, _) => {
-        management.createTable(hTableParams.cluster, hTableParams.hTableName, 
List("e", "v"),
-          hTableParams.preSplitSize, hTableParams.hTableTTL,
-          
hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
-        logger.info(hTableParams.toString())
-        ok(s"HTable was created.")
-      }
-      case err@JsError(_) => bad(Json.toJson(err))
-    }).getOrElse(bad("Invalid Json."))
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ApplicationController.scala 
b/s2rest_play/app/controllers/ApplicationController.scala
deleted file mode 100644
index 5f54edd..0000000
--- a/s2rest_play/app/controllers/ApplicationController.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-package controllers
-
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-import com.kakao.s2graph.core.PostProcess
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.iteratee.Enumerator
-import play.api.libs.json.{JsString, JsValue, Json}
-import play.api.mvc._
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object ApplicationController extends Controller {
-
-  var isHealthy = true
-  var deployInfo = ""
-  val applicationJsonHeader = "application/json"
-
-  val jsonParser: BodyParser[JsValue] = controllers.s2parse.json
-
-  val jsonText: BodyParser[String] = controllers.s2parse.jsonText
-
-  private def badQueryExceptionResults(ex: Exception) =
-    
Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
-
-  private def errorResults =
-    Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader))
-
-  def requestFallback(body: String): PartialFunction[Throwable, 
Future[Result]] = {
-    case e: BadQueryException =>
-      logger.error(s"{$body}, ${e.getMessage}", e)
-      badQueryExceptionResults(e)
-    case e: Exception =>
-      logger.error(s"${body}, ${e.getMessage}", e)
-      errorResults
-  }
-
-  def updateHealthCheck(isHealthy: Boolean) = Action { request =>
-    this.isHealthy = isHealthy
-    Ok(this.isHealthy + "\n")
-  }
-
-  def healthCheck() = withHeader(parse.anyContent) { request =>
-    if (isHealthy) Ok(deployInfo)
-    else NotFound
-  }
-
-  def jsonResponse(json: JsValue, headers: (String, String)*) =
-    if (ApplicationController.isHealthy) {
-      Ok(json).as(applicationJsonHeader).withHeaders(headers: _*)
-    } else {
-      Result(
-        header = ResponseHeader(OK),
-        body = Enumerator(json.toString.getBytes()),
-        connection = HttpConnection.Close
-      ).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: 
headers: _*)
-    }
-
-  def toLogMessage[A](request: Request[A], result: Result)(startedAt: Long): 
String = {
-    val duration = System.currentTimeMillis() - startedAt
-    val isQueryRequest = result.header.headers.contains("result_size")
-    val resultSize = result.header.headers.getOrElse("result_size", "-1")
-
-    try {
-      val body = request.body match {
-        case AnyContentAsJson(jsValue) => jsValue match {
-          case JsString(str) => str
-          case _ => jsValue.toString
-        }
-        case AnyContentAsEmpty => ""
-        case _ => request.body.toString
-      }
-
-      val str =
-        if (isQueryRequest)
-          s"${request.method} ${request.uri} took ${duration} ms 
${result.header.status} ${resultSize} ${body}"
-        else
-          s"${request.method} ${request.uri} took ${duration} ms 
${result.header.status} ${resultSize} ${body}"
-
-      str
-    } finally {
-      /* pass */
-    }
-  }
-
-  def withHeaderAsync[A](bodyParser: BodyParser[A])(block: Request[A] => 
Future[Result])(implicit ex: ExecutionContext) =
-    Action.async(bodyParser) { request =>
-      val startedAt = System.currentTimeMillis()
-      block(request).map { r =>
-        logger.info(toLogMessage(request, r)(startedAt))
-        r
-      }
-    }
-
-  def withHeader[A](bodyParser: BodyParser[A])(block: Request[A] => Result) =
-    Action(bodyParser) { request: Request[A] =>
-      val startedAt = System.currentTimeMillis()
-      val r = block(request)
-      logger.info(toLogMessage(request, r)(startedAt))
-      r
-    }
-}

Reply via email to