http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/CounterController.scala 
b/s2rest_play/app/controllers/CounterController.scala
deleted file mode 100644
index c2b4dc2..0000000
--- a/s2rest_play/app/controllers/CounterController.scala
+++ /dev/null
@@ -1,747 +0,0 @@
-package controllers
-
-import com.kakao.s2graph.core.ExceptionHandler
-import com.kakao.s2graph.core.ExceptionHandler.KafkaMessage
-import com.kakao.s2graph.core.mysqls.Label
-import config.CounterConfig
-import models._
-import org.apache.kafka.clients.producer.ProducerRecord
-import play.api.Play
-import play.api.libs.json.Reads._
-import play.api.libs.json._
-import play.api.mvc.{Action, Controller, Request}
-import s2.config.S2CounterConfig
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v1.{ExactStorageAsyncHBase, RankingStorageRedis}
-import s2.counter.core.v2.{ExactStorageGraph, RankingStorageGraph}
-import s2.models.Counter.ItemType
-import s2.models.{Counter, CounterModel}
-import s2.util.{CartesianProduct, ReduceMapValue, UnitConverter}
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success, Try}
-
-/**
- * Created by hsleep([email protected]) on 15. 5. 22..
- */
-object CounterController extends Controller {
-  import play.api.libs.concurrent.Execution.Implicits.defaultContext
-
-  val config = Play.current.configuration.underlying
-  val s2config = new S2CounterConfig(config)
-
-  private val exactCounterMap = Map(
-    s2.counter.VERSION_1 -> new ExactCounter(config, new 
ExactStorageAsyncHBase(config)),
-    s2.counter.VERSION_2 -> new ExactCounter(config, new 
ExactStorageGraph(config))
-  )
-  private val rankingCounterMap = Map(
-    s2.counter.VERSION_1 -> new RankingCounter(config, new 
RankingStorageRedis(config)),
-    s2.counter.VERSION_2 -> new RankingCounter(config, new 
RankingStorageGraph(config))
-  )
-
-  private val tablePrefixMap = Map (
-    s2.counter.VERSION_1 -> "s2counter",
-    s2.counter.VERSION_2 -> "s2counter_v2"
-  )
-
-  private def exactCounter(version: Byte): ExactCounter = 
exactCounterMap(version)
-  private def rankingCounter(version: Byte): RankingCounter = 
rankingCounterMap(version)
-
-  lazy val counterModel = new CounterModel(config)
-
-  def getQueryString[T](key: String, default: String)(implicit request: 
Request[T]): String = {
-    request.getQueryString(key).getOrElse(default)
-  }
-
-  implicit val counterWrites = new Writes[Counter] {
-    override def writes(o: Counter): JsValue = Json.obj(
-      "version" -> o.version.toInt,
-      "autoComb" -> o.autoComb,
-      "dimension" -> o.dimension,
-      "useProfile" -> o.useProfile,
-      "bucketImpId" -> o.bucketImpId,
-      "useRank" -> o.useRank,
-      "intervalUnit" -> o.intervalUnit,
-      "ttl" -> o.ttl,
-      "dailyTtl" -> o.dailyTtl,
-      "rateAction" -> o.rateActionId.flatMap { actionId =>
-        counterModel.findById(actionId, useCache = false).map { actionPolicy =>
-          Json.obj("service" -> actionPolicy.service, "action" -> 
actionPolicy.action)
-        }
-      },
-      "rateBase" -> o.rateBaseId.flatMap { baseId =>
-        counterModel.findById(baseId, useCache = false).map { basePolicy =>
-          Json.obj("service" -> basePolicy.service, "action" -> 
basePolicy.action)
-        }
-      },
-      "rateThreshold" -> o.rateThreshold
-    )
-  }
-
-  def createAction(service: String, action: String) = Action(s2parse.json) { 
implicit request =>
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case None =>
-        val body = request.body
-        val version = (body \ 
"version").asOpt[Int].map(_.toByte).getOrElse(s2.counter.VERSION_2)
-        val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true)
-        val dimension = (body \ "dimension").asOpt[String].getOrElse("")
-        val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false)
-        val bucketImpId = (body \ "bucketImpId").asOpt[String]
-
-        val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true)
-        val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true)
-
-        val intervalUnit = (body \ "intervalUnit").asOpt[String]
-        // 2 day
-        val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60)
-        val dailyTtl = (body \ "dailyTtl").asOpt[Int]
-        val regionMultiplier = (body \ 
"regionMultiplier").asOpt[Int].getOrElse(1)
-
-        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
-        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
-        val rateThreshold = (body \ "rateThreshold").asOpt[Int]
-
-        val rateActionId = {
-          for {
-            actionMap <- rateAction
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action)
-          } yield {
-            policy.id
-          }
-        }
-        val rateBaseId = {
-          for {
-            actionMap <- rateBase
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action)
-          } yield {
-            policy.id
-          }
-        }
-
-        val hbaseTable = {
-          Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_"
-        }
-
-        // find label
-        val itemType = Label.findByName(action, useCache = false) match {
-          case Some(label) =>
-            ItemType.withName(label.tgtColumnType.toUpperCase)
-          case None =>
-            val strItemType = (body \ 
"itemType").asOpt[String].getOrElse("STRING")
-            ItemType.withName(strItemType.toUpperCase)
-        }
-        val policy = Counter(useFlag = true, version, service, action, 
itemType, autoComb = autoComb, dimension,
-          useProfile = useProfile, bucketImpId, useRank = useRank, ttl, 
dailyTtl, Some(hbaseTable), intervalUnit,
-          rateActionId, rateBaseId, rateThreshold)
-
-        // prepare exact storage
-        exactCounter(version).prepare(policy)
-        if (useRank) {
-          // prepare ranking storage
-          rankingCounter(version).prepare(policy)
-        }
-        counterModel.createServiceAction(policy)
-        Ok(Json.toJson(Map("msg" -> s"created $service/$action")))
-      case Some(policy) =>
-        Ok(Json.toJson(Map("msg" -> s"already exist $service/$action")))
-    }
-  }
-
-  def getAction(service: String, action: String) = Action { request =>
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case Some(policy) =>
-        Ok(Json.toJson(policy))
-      case None =>
-        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
-    }
-  }
-
-  def updateAction(service: String, action: String) = Action(s2parse.json) { 
request =>
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case Some(oldPolicy) =>
-        val body = request.body
-        val autoComb = (body \ 
"autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb)
-        val dimension = (body \ 
"dimension").asOpt[String].getOrElse(oldPolicy.dimension)
-        val useProfile = (body \ 
"useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile)
-        val bucketImpId = (body \ "bucketImpId").asOpt[String] match {
-          case Some(s) => Some(s)
-          case None => oldPolicy.bucketImpId
-        }
-
-        val useRank = (body \ 
"useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank)
-
-        val intervalUnit = (body \ "intervalUnit").asOpt[String] match {
-          case Some(s) => Some(s)
-          case None => oldPolicy.intervalUnit
-        }
-
-        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
-        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
-        val rateThreshold = (body \ "rateThreshold").asOpt[Int] match {
-          case Some(i) => Some(i)
-          case None => oldPolicy.rateThreshold
-        }
-
-        val rateActionId = {
-          for {
-            actionMap <- rateAction
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action, 
useCache = false)
-          } yield {
-            policy.id
-          }
-        } match {
-          case Some(i) => Some(i)
-          case None => oldPolicy.rateActionId
-        }
-        val rateBaseId = {
-          for {
-            actionMap <- rateBase
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action, 
useCache = false)
-          } yield {
-            policy.id
-          }
-        } match {
-          case Some(i) => Some(i)
-          case None => oldPolicy.rateBaseId
-        }
-
-        // new counter
-        val policy = Counter(id = oldPolicy.id, useFlag = oldPolicy.useFlag, 
oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, 
dimension,
-          useProfile = useProfile, bucketImpId, useRank = useRank, 
oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit,
-          rateActionId, rateBaseId, rateThreshold)
-
-        counterModel.updateServiceAction(policy)
-        Ok(Json.toJson(Map("msg" -> s"updated $service/$action")))
-      case None =>
-        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
-    }
-  }
-
-  def prepareAction(service: String, action: String) = Action(s2parse.json) { 
request =>
-    // for data migration
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case Some(policy) =>
-        val body = request.body
-        val version = (body \ "version").as[Int].toByte
-        if (version != policy.version) {
-          // change table name
-          val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) 
++ policy.dailyTtl mkString "_"
-          val newPolicy = policy.copy(version = version, hbaseTable = 
Some(newTableName))
-          exactCounter(version).prepare(newPolicy)
-          if (newPolicy.useRank) {
-            rankingCounter(version).prepare(newPolicy)
-          }
-          Ok(Json.toJson(Map("msg" -> s"prepare storage v$version 
$service/$action")))
-        } else {
-          Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version 
$service/$action")))
-        }
-      case None =>
-        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
-    }
-  }
-
-  def deleteAction(service: String, action: String) = Action.apply {
-    {
-      for {
-        policy <- counterModel.findByServiceAction(service, action, useCache = 
false)
-      } yield {
-        Try {
-          exactCounter(policy.version).destroy(policy)
-          if (policy.useRank) {
-            rankingCounter(policy.version).destroy(policy)
-          }
-          counterModel.deleteServiceAction(policy)
-        } match {
-          case Success(v) =>
-            Ok(Json.toJson(Map("msg" -> s"deleted $service/$action")))
-          case Failure(ex) =>
-            throw ex
-        }
-      }
-    }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not 
found"))))
-  }
-
-  def getExactCountAsync(service: String, action: String, item: String) = 
Action.async { implicit request =>
-    val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
-      .map(IntervalUnit.withName)
-    val limit = getQueryString("limit", "1").toInt
-
-    val qsSum = request.getQueryString("sum")
-
-    val optFrom = 
request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
-    val optTo = 
request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
-
-    val limitOpt = (optFrom, optTo) match {
-      case (Some(_), Some(_)) =>
-        None
-      case _ =>
-        Some(limit)
-    }
-
-    // find dimension
-    lazy val dimQueryValues = request.queryString.filterKeys { k => 
k.charAt(0) == ':' }.map { case (k, v) =>
-      (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet)
-    }
-//    Logger.warn(s"$dimQueryValues")
-
-    counterModel.findByServiceAction(service, action) match {
-      case Some(policy) =>
-        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optFrom, optTo)
-        val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
-        try {
-//          Logger.warn(s"$tqs $qsSum")
-          if (tqs.head.length > 1 && qsSum.isDefined) {
-            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, 
qsSum).map { jsVal =>
-              Ok(jsVal)
-            }
-          } else {
-            getExactCountToJs(policy, item, timeRange, limitOpt, 
dimQueryValues).map { jsVal =>
-              Ok(jsVal)
-            }
-          }
-        } catch {
-          case e: Exception =>
-            throw e
-//            Future.successful(BadRequest(s"$service, $action, $item"))
-        }
-      case None =>
-        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action 
not found"))))
-    }
-  }
-
-  /**
-   * [{
-   *    "service": , "action", "itemIds": [], "interval": string, "limit": 
int, "from": ts, "to": ts,
-   *    "dimensions": [{"key": list[String]}]
-   * }]
-   * @return
-   */
-  private def parseExactCountParam(jsValue: JsValue) = {
-    val service = (jsValue \ "service").as[String]
-    val action = (jsValue \ "action").as[String]
-    val itemIds = (jsValue \ "itemIds").as[Seq[String]]
-    val intervals = (jsValue \ 
"intervals").asOpt[Seq[String]].getOrElse(Seq("t")).distinct.map(IntervalUnit.withName)
-    val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1)
-    val from = (jsValue \ "from").asOpt[Long]
-    val to = (jsValue \ "to").asOpt[Long]
-    val sum = (jsValue \ "sum").asOpt[String]
-    val dimensions = {
-      for {
-        dimension <- (jsValue \ 
"dimensions").asOpt[Seq[JsObject]].getOrElse(Nil)
-        (k, vs) <- dimension.fields
-      } yield {
-        k -> vs.as[Seq[String]].toSet
-      }
-    }.toMap
-    (service, action, itemIds, intervals, limit, from, to, dimensions, sum)
-  }
-
-  def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request 
=>
-    val jsValue = request.body
-    try {
-      val futures = {
-        for {
-          jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil)
-          (service, action, itemIds, intervalUnits, limit, from, to, 
dimQueryValues, qsSum) = parseExactCountParam(jsObject)
-          optFrom = from.map(UnitConverter.toMillis)
-          optTo = to.map(UnitConverter.toMillis)
-          timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
-          policy <- counterModel.findByServiceAction(service, action).toSeq
-          item <- itemIds
-        } yield {
-          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optFrom, optTo)
-          val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
-          val limitOpt = (optFrom, optTo) match {
-            case (Some(_), Some(_)) =>
-              None
-            case _ =>
-              Some(limit)
-          }
-
-//          Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs")
-
-          if (tqs.head.length > 1 && qsSum.isDefined) {
-            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum)
-          } else {
-            getExactCountToJs(policy, item, timeRange, limitOpt, 
dimQueryValues)
-          }
-        }
-      }
-      Future.sequence(futures).map { rets =>
-        Ok(Json.toJson(rets))
-      }
-    } catch {
-      case e: Exception =>
-        throw e
-//        Future.successful(BadRequest(s"$jsValue"))
-    }
-  }
-
-  private [controllers] def fetchedToResult(fetchedCounts: 
FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = {
-    for {
-      ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap
-    } yield {
-      val counterItems = {
-        val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts }
-        val limited = limitOpt match {
-          case Some(limit) => sortedItems.take(limit)
-          case None => sortedItems
-        }
-        for {
-          (eq, value) <- limited
-        } yield {
-          ExactCounterItem(eq.tq.ts, value, value.toDouble)
-        }
-      }
-      ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems)
-    }
-  }.toSeq
-
-  private def decayedToResult(decayedCounts: DecayedCounts): 
Seq[ExactCounterIntervalItem] = {
-    for {
-      (eq, score) <- decayedCounts.qualifierWithCountMap
-    } yield {
-      ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, 
Seq(ExactCounterItem(eq.tq.ts, score.toLong, score)))
-    }
-  }.toSeq
-
-  private def getExactCountToJs(policy: Counter,
-                                item: String,
-                                timeRange: Seq[(TimedQualifier, 
TimedQualifier)],
-                                limitOpt: Option[Int],
-                                dimQueryValues: Map[String, Set[String]]): 
Future[JsValue] = {
-    exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, 
dimQueryValues).map { seq =>
-      val items = {
-        for {
-          fetched <- seq
-        } yield {
-          fetchedToResult(fetched, limitOpt)
-        }
-      }.flatten
-      Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, 
policy.action, item), items))
-    }
-  }
-
-  private def getDecayedCountToJs(policy: Counter,
-                                  item: String,
-                                  timeRange: Seq[(TimedQualifier, 
TimedQualifier)],
-                                  dimQueryValues: Map[String, Set[String]],
-                                  qsSum: Option[String]): Future[JsValue] = {
-    exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), 
timeRange, dimQueryValues, qsSum).map { seq =>
-      val decayedCounts = seq.head
-      val meta = ExactCounterResultMeta(policy.service, policy.action, 
decayedCounts.exactKey.itemKey)
-      val intervalItems = decayedToResult(decayedCounts)
-      Json.toJson(ExactCounterResult(meta, intervalItems))
-    }
-  }
-
-  def getRankingCountAsync(service: String, action: String) = Action.async { 
implicit request =>
-    lazy val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
-      .map(IntervalUnit.withName)
-    lazy val limit = getQueryString("limit", "1").toInt
-    lazy val kValue = getQueryString("k", "10").toInt
-
-    lazy val qsSum = request.getQueryString("sum")
-
-    lazy val optFrom = 
request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
-    lazy val optTo = 
request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
-
-    // find dimension
-    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) 
== ':' }.map { case (k, v) =>
-      (k.substring(1), v.mkString(",").split(',').toList)
-    }
-
-    val dimensions = {
-      for {
-        values <- CartesianProduct(dimensionMap.values.toList).toSeq
-      } yield {
-        dimensionMap.keys.zip(values).toMap
-      }
-    }
-
-    counterModel.findByServiceAction(service, action) match {
-      case Some(policy) =>
-        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optTo)
-        val dimKeys = {
-          for {
-            dimension <- dimensions
-          } yield {
-            dimension -> tqs.map(tq => RankingKey(policy.id, policy.version, 
ExactQualifier(tq, dimension)))
-          }
-        }
-
-        // if tqs has only 1 tq, do not apply sum function
-        try {
-          val rankResult = {
-            if (tqs.length > 1 && qsSum.isDefined) {
-              getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum)
-            } else {
-              // no summary
-              Future.successful(getRankCounterResult(policy, dimKeys, kValue))
-            }
-          }
-
-          rankResult.map { result =>
-            Ok(Json.toJson(result))
-          }
-        } catch {
-          case e: UnsupportedOperationException =>
-            Future.successful(NotImplemented(Json.toJson(
-              Map("msg" -> e.getMessage)
-            )))
-          case e: Throwable =>
-            throw e
-        }
-      case None =>
-        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action 
not found"))))
-    }
-  }
-
-  def deleteRankingCount(service: String, action: String) = Action.async { 
implicit request =>
-    lazy val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
-      .map(IntervalUnit.withName)
-    lazy val limit = getQueryString("limit", "1").toInt
-
-    // find dimension
-    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) 
== ':' }.map { case (k, v) =>
-      (k.substring(1), v.mkString(",").split(',').toList)
-    }
-
-    val dimensions = {
-      for {
-        values <- CartesianProduct(dimensionMap.values.toList).toSeq
-      } yield {
-        dimensionMap.keys.zip(values).toMap
-      }
-    }
-
-    Future {
-      counterModel.findByServiceAction(service, action) match {
-        case Some(policy) =>
-          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit)
-          val keys = {
-            for {
-              dimension <- dimensions
-              tq <- tqs
-            } yield {
-              RankingKey(policy.id, policy.version, ExactQualifier(tq, 
dimension))
-            }
-          }
-
-          for {
-            key <- keys
-          } {
-            rankingCounter(policy.version).delete(key)
-          }
-
-          Ok(JsObject(
-            Seq(
-              ("msg", Json.toJson(s"delete ranking in $service.$action")),
-              ("items", Json.toJson({
-                for {
-                  key <- keys
-                } yield {
-                  s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}"
-                }
-              }))
-            )
-          ))
-        case None =>
-          NotFound(Json.toJson(
-            Map("msg" -> s"$service.$action not found")
-          ))
-      }
-    }
-  }
-
-  val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, 
RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1))
-
-  // change format
-  private def getDecayedCountsAsync(policy: Counter,
-                                    items: Seq[String],
-                                    timeRange: (TimedQualifier, 
TimedQualifier),
-                                    dimension: Map[String, String],
-                                    qsSum: Option[String]): 
Future[Seq[(ExactKeyTrait, Double)]] = {
-    exactCounter(policy.version).getDecayedCountsAsync(policy, items, 
Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq =>
-      for {
-        DecayedCounts(exactKey, qcMap) <- seq
-        value <- qcMap.values
-      } yield {
-        exactKey -> value
-      }
-    }
-  }
-
-  def getSumRankCounterResultAsync(policy: Counter,
-                                   dimKeys: Seq[(Map[String, String], 
Seq[RankingKey])],
-                                   kValue: Int,
-                                   qsSum: Option[String]): 
Future[RankCounterResult] = {
-    val futures = {
-      for {
-        (dimension, keys) <- dimKeys
-      } yield {
-        val tqs = keys.map(rk => rk.eq.tq)
-        val (tqFrom, tqTo) = (tqs.last, tqs.head)
-        val items = rankingCounter(policy.version).getAllItems(keys, kValue)
-//        Logger.warn(s"item count: ${items.length}")
-        val future = {
-          if (policy.isRateCounter) {
-            val actionPolicy = 
policy.rateActionId.flatMap(counterModel.findById(_)).get
-            val basePolicy = 
policy.rateBaseId.flatMap(counterModel.findById(_)).get
-
-            val futureAction = getDecayedCountsAsync(actionPolicy, items, 
(tqFrom, tqTo), dimension, qsSum).map { seq =>
-              seq.map { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(score, -1)
-              }.toMap
-            }
-            val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, 
tqTo), dimension, qsSum).map { seq =>
-              seq.map { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(-1, score)
-              }.toMap
-            }
-            futureAction.zip(futureBase).map { case (actionScores, baseScores) 
=>
-              reduceRateRankingValue(actionScores, baseScores).map { case (k, 
rrv) =>
-//                Logger.warn(s"$k -> $rrv")
-                k -> rrv.rankingValue.score
-              }.toSeq
-            }
-          }
-          else if (policy.isTrendCounter) {
-            val actionPolicy = 
policy.rateActionId.flatMap(counterModel.findById(_)).get
-            val basePolicy = 
policy.rateBaseId.flatMap(counterModel.findById(_)).get
-
-            val futureAction = getDecayedCountsAsync(actionPolicy, items, 
(tqFrom, tqTo), dimension, qsSum).map { seq =>
-              seq.map { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(score, -1)
-              }.toMap
-            }
-            val futureBase = getDecayedCountsAsync(basePolicy, items, 
(tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq =>
-              seq.map { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(-1, score)
-              }.toMap
-            }
-            futureAction.zip(futureBase).map { case (actionScores, baseScores) 
=>
-              reduceRateRankingValue(actionScores, baseScores).map { case (k, 
rrv) =>
-//                Logger.warn(s"$k -> $rrv")
-                k -> rrv.rankingValue.score
-              }.toSeq
-            }
-          }
-          else {
-            getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, 
qsSum)
-          }
-        }
-        future.map { keyWithScore =>
-          val ranking = keyWithScore.sortBy(-_._2).take(kValue)
-          val rankCounterItems = {
-            for {
-              idx <- ranking.indices
-              (exactKey, score) = ranking(idx)
-            } yield {
-              val realId = policy.itemType match {
-                case ItemType.BLOB => 
exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey)
-                  .getOrElse(throw new Exception(s"not found blob id. 
${policy.service}.${policy.action} ${exactKey.itemKey}"))
-                case _ => exactKey.itemKey
-              }
-              RankCounterItem(idx + 1, realId, score)
-            }
-          }
-
-          val eq = ExactQualifier(tqFrom, dimension)
-          RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, 
-1, rankCounterItems)
-        }
-      }
-    }
-
-    Future.sequence(futures).map { dimensionResultList =>
-      RankCounterResult(RankCounterResultMeta(policy.service, policy.action), 
dimensionResultList)
-    }
-  }
-
-  def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], 
Seq[RankingKey])], kValue: Int): RankCounterResult = {
-    val dimensionResultList = {
-      for {
-        (dimension, keys) <- dimKeys
-        key <- keys
-      } yield {
-        val rankingValue = rankingCounter(policy.version).getTopK(key, kValue)
-        val ranks = {
-          for {
-            rValue <- rankingValue.toSeq
-            idx <- rValue.values.indices
-            rank = idx + 1
-          } yield {
-            val (id, score) = rValue.values(idx)
-            val realId = policy.itemType match {
-              case ItemType.BLOB =>
-                exactCounter(policy.version)
-                  .getBlobValue(policy, id)
-                  .getOrElse(throw new Exception(s"not found blob id. 
${policy.service}.${policy.action} $id"))
-              case _ => id
-            }
-            RankCounterItem(rank, realId, score)
-          }
-        }
-        val eq = key.eq
-        val tq = eq.tq
-        RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, 
rankingValue.map(v => v.totalScore).getOrElse(0d), ranks)
-      }
-    }
-
-    RankCounterResult(RankCounterResultMeta(policy.service, policy.action), 
dimensionResultList)
-  }
-
-  type Record = ProducerRecord[String, String]
-
-  def incrementCount(service: String, action: String, item: String) = 
Action.async(s2parse.json) { request =>
-    Future {
-      /**
-       * {
-       * timestamp: Long
-       * property: {}
-       * value: Int
-       * }
-       */
-      lazy val metaMap = Map("service" -> service, "action" -> action, "item" 
-> item)
-      counterModel.findByServiceAction(service, action).map { policy =>
-        val body = request.body
-        try {
-          val ts = (body \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString
-          val dimension = (body \ 
"dimension").asOpt[JsValue].getOrElse(Json.obj())
-          val property = (body \ 
"property").asOpt[JsValue].getOrElse(Json.obj())
-
-          val msg = List(ts, service, action, item, dimension, 
property).mkString("\t")
-
-          // produce to kafka
-          // hash partitioner by key
-          ExceptionHandler.enqueue(KafkaMessage(new 
Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg)))
-
-          Ok(Json.toJson(
-            Map(
-              "meta" -> metaMap
-            )
-          ))
-        }
-        catch {
-          case e: JsResultException =>
-            BadRequest(Json.toJson(
-              Map("msg" -> s"need timestamp.")
-            ))
-        }
-      }.getOrElse {
-        NotFound(Json.toJson(
-          Map("msg" -> s"$service.$action not found")
-        ))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/EdgeController.scala 
b/s2rest_play/app/controllers/EdgeController.scala
deleted file mode 100644
index a8e1a41..0000000
--- a/s2rest_play/app/controllers/EdgeController.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-package controllers
-
-import actors.QueueActor
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{Label}
-import com.kakao.s2graph.core.rest.RequestParser
-import com.kakao.s2graph.core.utils.logger
-import config.Config
-import org.apache.kafka.clients.producer.ProducerRecord
-import play.api.libs.json._
-import play.api.mvc.{Controller, Result}
-
-import scala.collection.Seq
-import scala.concurrent.Future
-
-object EdgeController extends Controller {
-
-  import ExceptionHandler._
-  import controllers.ApplicationController._
-  import play.api.libs.concurrent.Execution.Implicits._
-
-  private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
-  private val requestParser: RequestParser = 
com.kakao.s2graph.rest.Global.s2parser
-  private def jsToStr(js: JsValue): String = js match {
-    case JsString(s) => s
-    case _ => js.toString()
-  }
-
-  def toTsv(jsValue: JsValue, op: String): String = {
-    val ts = jsToStr(jsValue \ "timestamp")
-    val from = jsToStr(jsValue \ "from")
-    val to = jsToStr(jsValue \ "to")
-    val label = jsToStr(jsValue \ "label")
-    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
-
-    (jsValue \ "direction").asOpt[String] match {
-      case None => Seq(ts, op, "e", from, to, label, props).mkString("\t")
-      case Some(dir) => Seq(ts, op, "e", from, to, label, props, 
dir).mkString("\t")
-    }
-  }
-
-  def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = 
false): Future[Result] = {
-    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-
-    else {
-      try {
-        logger.debug(s"$jsValue")
-        val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation)
-
-        for ((edge, orgJs) <- edges.zip(jsOrgs)) {
-          if (edge.isAsync)
-            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, 
Option(toTsv(orgJs, operation))))
-          else
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
edge, Option(toTsv(orgJs, operation))))
-        }
-
-        val edgesToStore = edges.filterNot(e => e.isAsync)
-
-        if (withWait) {
-          val rets = s2.mutateEdges(edgesToStore, withWait = true)
-          rets.map(Json.toJson(_)).map(jsonResponse(_))
-        } else {
-          val rets = edgesToStore.map { edge => QueueActor.router ! edge; true 
}
-          Future.successful(jsonResponse(Json.toJson(rets)))
-        }
-      } catch {
-        case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
-        case e: Exception =>
-          logger.error(s"mutateAndPublish: $e", e)
-          Future.successful(InternalServerError(s"${e.getStackTrace}"))
-      }
-    }
-  }
-
-  def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] 
= {
-    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-
-    logger.debug(s"$str")
-    val edgeStrs = str.split("\\n")
-
-    var vertexCnt = 0L
-    var edgeCnt = 0L
-    try {
-      val elements =
-        for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); 
element <- Graph.toGraphElement(str)) yield {
-          element match {
-            case v: Vertex => vertexCnt += 1
-            case e: Edge => edgeCnt += 1
-          }
-          if (element.isAsync) {
-            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, 
Some(str)))
-          } else {
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
element, Some(str)))
-          }
-          element
-        }
-
-      //FIXME:
-      val elementsToStore = elements.filterNot(e => e.isAsync)
-      if (withWait) {
-        val rets = s2.mutateElements(elementsToStore, withWait)
-        rets.map(Json.toJson(_)).map(jsonResponse(_))
-      } else {
-        val rets = elementsToStore.map { element => QueueActor.router ! 
element; true }
-        Future.successful(jsonResponse(Json.toJson(rets)))
-      }
-    } catch {
-      case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
-      case e: Throwable =>
-        logger.error(s"mutateAndPublish: $e", e)
-        Future.successful(InternalServerError(s"${e.getStackTrace}"))
-    }
-  }
-
-  def mutateBulk() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body, withWait = false)
-  }
-
-  def mutateBulkWithWait() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body, withWait = true)
-  }
-
-  def inserts() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert")
-  }
-
-  def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert", withWait = true)
-  }
-
-  def insertsBulk() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insertBulk")
-  }
-
-  def deletes() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete")
-  }
-
-  def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete", withWait = true)
-  }
-
-  def updates() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update")
-  }
-
-  def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update", withWait = true)
-  }
-
-  def increments() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "increment")
-  }
-
-  def incrementsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "increment", withWait = true)
-  }
-
-  def incrementCounts() = withHeaderAsync(jsonParser) { request =>
-    val jsValue = request.body
-    val edges = requestParser.toEdges(jsValue, "incrementCount")
-
-    s2.incrementCounts(edges, withWait = true).map { results =>
-      val json = results.map { case (isSuccess, resultCount) =>
-        Json.obj("success" -> isSuccess, "result" -> resultCount)
-      }
-
-      jsonResponse(Json.toJson(json))
-    }
-  }
-
-  def deleteAll() = withHeaderAsync(jsonParser) { request =>
-//    deleteAllInner(request.body, withWait = false)
-    deleteAllInner(request.body, withWait = true)
-  }
-
-  def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
-
-    /** logging for delete all request */
-    def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, 
direction: String, topicOpt: Option[String]) = {
-      val kafkaMessages = for {
-        id <- ids
-        label <- labels
-      } yield {
-        val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), 
label.label, "{}", direction).mkString("\t")
-        val topic = topicOpt.getOrElse {
-          if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else 
Config.KAFKA_LOG_TOPIC
-        }
-
-        val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, 
tsv))
-        kafkaMsg
-      }
-
-      ExceptionHandler.enqueues(kafkaMessages)
-    }
-
-    def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], 
ts: Long, vertices: Seq[Vertex]) = {
-      enqueueLogMessage(ids, labels, ts, direction, None)
-      val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
-      if (withWait) {
-        future
-      } else {
-        Future.successful(true)
-      }
-    }
-
-    val deleteFutures = jsValue.as[Seq[JsValue]].map { json =>
-      val (labels, direction, ids, ts, vertices) = 
requestParser.toDeleteParam(json)
-      if (labels.isEmpty || ids.isEmpty) Future.successful(true)
-      else deleteEach(labels, direction, ids, ts, vertices)
-    }
-
-    val deleteResults = Future.sequence(deleteFutures)
-    deleteResults.map { rst =>
-      logger.debug(s"deleteAllInner: $rst")
-      Ok(s"deleted... ${rst.toString()}")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ExperimentController.scala 
b/s2rest_play/app/controllers/ExperimentController.scala
deleted file mode 100644
index e48b0f1..0000000
--- a/s2rest_play/app/controllers/ExperimentController.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package controllers
-
-
-import com.kakao.s2graph.core.mysqls.Experiment
-import com.kakao.s2graph.core.rest.RestHandler
-import com.kakao.s2graph.core.utils.logger
-import play.api.mvc._
-import scala.concurrent.ExecutionContext.Implicits.global
-
-object ExperimentController extends Controller {
-  private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest
-
-  import ApplicationController._
-
-  def experiment(accessToken: String, experimentName: String, uuid: String) = 
withHeaderAsync(jsonText) { request =>
-    val body = request.body
-
-    val res = rest.doPost(request.uri, body, 
request.headers.get(Experiment.impressionKey))
-    res.body.map { case js =>
-      val headers = res.headers :+ ("result_size" -> 
rest.calcSize(js).toString)
-      jsonResponse(js, headers: _*)
-    } recoverWith ApplicationController.requestFallback(body)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/JsonBodyParser.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/JsonBodyParser.scala 
b/s2rest_play/app/controllers/JsonBodyParser.scala
deleted file mode 100644
index 4339eb4..0000000
--- a/s2rest_play/app/controllers/JsonBodyParser.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package controllers
-
-import com.kakao.s2graph.core.utils.logger
-import play.api.Play
-import play.api.libs.iteratee.{Execution, Iteratee}
-import play.api.libs.json.{JsValue, Json}
-import play.api.mvc._
-
-import scala.concurrent.Future
-import scala.util.control.NonFatal
-
-object s2parse extends BodyParsers {
-
-  import parse._
-
-  val defaultMaxTextLength = 1024 * 512
-  val defaultMaxJsonLength = 1024 * 512
-
-  def json: BodyParser[JsValue] = json(defaultMaxTextLength)
-
-  /**
-    * parseText with application/json header for Pre-Process text
-    */
-  def jsonText: BodyParser[String] = when(
-    _.contentType.exists(m => m.equalsIgnoreCase("text/json") || 
m.equalsIgnoreCase("application/json")),
-    jsonText(defaultMaxTextLength),
-    createBadResult("Expecting text/json or application/json body")
-  )
-
-  private def jsonText(maxLength: Int): BodyParser[String] = BodyParser("json, 
maxLength=" + maxLength) { request =>
-    import play.api.libs.iteratee.Execution.Implicits.trampoline
-    import play.api.libs.iteratee.Traversable
-
-    Traversable.takeUpTo[Array[Byte]](maxLength)
-      .transform(Iteratee.consume[Array[Byte]]().map(c => new String(c, 
"UTF-8")))
-      .flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
-  }
-
-  def json(maxLength: Int): BodyParser[JsValue] = when(
-    _.contentType.exists(m => m.equalsIgnoreCase("text/json") || 
m.equalsIgnoreCase("application/json")),
-    tolerantJson(maxLength),
-    createBadResult("Expecting text/json or application/json body")
-  )
-
-  def tolerantJson(maxLength: Int): BodyParser[JsValue] =
-    tolerantBodyParser[JsValue]("json", maxLength, "Invalid Json") { (request, 
bytes) =>
-      // Encoding notes: RFC 4627 requires that JSON be encoded in Unicode, 
and states that whether that's
-      // UTF-8, UTF-16 or UTF-32 can be auto detected by reading the first two 
bytes. So we ignore the declared
-      // charset and don't decode, we passing the byte array as is because 
Jackson supports auto detection.
-      Json.parse(bytes)
-    }
-
-  private def tolerantBodyParser[A](name: String, maxLength: Int, 
errorMessage: String)(parser: (RequestHeader, Array[Byte]) => A): BodyParser[A] 
=
-    BodyParser(name + ", maxLength=" + maxLength) { request =>
-      import play.api.libs.iteratee.Execution.Implicits.trampoline
-      import play.api.libs.iteratee.Traversable
-
-      import scala.util.control.Exception._
-
-      val bodyParser: Iteratee[Array[Byte], Either[Result, 
Either[Future[Result], A]]] =
-        Traversable.takeUpTo[Array[Byte]](maxLength).transform(
-          Iteratee.consume[Array[Byte]]().map { bytes =>
-            allCatch[A].either {
-              parser(request, bytes)
-            }.left.map {
-              case NonFatal(e) =>
-                val txt = new String(bytes)
-                logger.error(s"$errorMessage: $txt", e)
-                createBadResult(s"$errorMessage: $e")(request)
-              case t => throw t
-            }
-          }
-        ).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
-
-      bodyParser.mapM {
-        case Left(tooLarge) => Future.successful(Left(tooLarge))
-        case Right(Left(badResult)) => badResult.map(Left.apply)
-        case Right(Right(body)) => Future.successful(Right(body))
-      }
-    }
-
-  private def createBadResult(msg: String): RequestHeader => Future[Result] = 
{ request =>
-    Play.maybeApplication.map(_.global.onBadRequest(request, msg))
-      .getOrElse(Future.successful(Results.BadRequest))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/PublishController.scala 
b/s2rest_play/app/controllers/PublishController.scala
deleted file mode 100644
index b1495d5..0000000
--- a/s2rest_play/app/controllers/PublishController.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package controllers
-
-import com.kakao.s2graph.core.ExceptionHandler
-import config.Config
-import org.apache.kafka.clients.producer.ProducerRecord
-import play.api.mvc._
-
-import scala.concurrent.Future
-
-object PublishController extends Controller {
-
-  import ApplicationController._
-  import ExceptionHandler._
-  import play.api.libs.concurrent.Execution.Implicits._
-
-  /**
-   * never check validation on string. just redirect strings to kafka.
-   */
-  val serviceNotExistException = new RuntimeException(s"service is not created 
in s2graph. create service first.")
-
-  //  private def toService(topic: String): String = {
-  //    Service.findByName(topic).map(service => 
s"${service.serviceName}-${Config.PHASE}").getOrElse(throw 
serviceNotExistException)
-  //  }
-  def publishOnly(topic: String) = withHeaderAsync(parse.text) { request =>
-    if (!Config.IS_WRITE_SERVER) Future.successful(UNAUTHORIZED)
-    //  val kafkaTopic = toService(topic)
-    val strs = request.body.split("\n")
-    strs.foreach(str => {
-      val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, 
str)
-      //    val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, 
s"$str")
-      //        logger.debug(s"$kafkaTopic, $str")
-      ExceptionHandler.enqueue(KafkaMessage(keyedMessage))
-    })
-    Future.successful(
-      Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", 
"Keep-Alive" -> "timeout=10, max=10")
-    )
-    //    try {
-    //
-    //    } catch {
-    //      case e: Exception => Future.successful(BadRequest(e.getMessage))
-    //    }
-  }
-
-  def publish(topic: String) = publishOnly(topic)
-
-  //  def mutateBulk(topic: String) = Action.async(parse.text) { request =>
-  //    EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, 
Config.KAFKA_FAIL_TOPIC, request.body).map { result =>
-  //      result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> 
"timeout=10, max=10")
-  //    }
-  //  }
-  def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
-    EdgeController.mutateAndPublish(request.body)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/QueryController.scala 
b/s2rest_play/app/controllers/QueryController.scala
deleted file mode 100644
index d133370..0000000
--- a/s2rest_play/app/controllers/QueryController.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-package controllers
-
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.Experiment
-import com.kakao.s2graph.core.rest.RestHandler
-import play.api.libs.json.{Json}
-import play.api.mvc._
-
-import scala.language.postfixOps
-
-object QueryController extends Controller with JSONParser {
-
-  import ApplicationController._
-  import play.api.libs.concurrent.Execution.Implicits.defaultContext
-
-  private val rest: RestHandler = com.kakao.s2graph.rest.Global.s2rest
-
-  def delegate(request: Request[String]) = {
-    rest.doPost(request.uri, request.body, 
request.headers.get(Experiment.impressionKey)).body.map {
-      js =>
-        jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
-    } recoverWith ApplicationController.requestFallback(request.body)
-  }
-
-  def getEdges() = withHeaderAsync(jsonText)(delegate)
-
-  def getEdgesWithGrouping() = withHeaderAsync(jsonText)(delegate)
-
-  def getEdgesExcluded() = withHeaderAsync(jsonText)(delegate)
-
-  def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonText)(delegate)
-
-  def checkEdges() = withHeaderAsync(jsonText)(delegate)
-
-  def getEdgesGrouped() = withHeaderAsync(jsonText)(delegate)
-
-  def getEdgesGroupedExcluded() = withHeaderAsync(jsonText)(delegate)
-
-  def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate)
-
-  def getEdge(srcId: String, tgtId: String, labelName: String, direction: 
String) =
-    withHeaderAsync(jsonText) {
-      request =>
-        val params = Json.arr(Json.obj("label" -> labelName, "direction" -> 
direction, "from" -> srcId, "to" -> tgtId))
-        rest.checkEdges(params).body.map {
-          js =>
-            jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
-        } recoverWith ApplicationController.requestFallback(request.body)
-    }
-
-  def getVertices() = withHeaderAsync(jsonText)(delegate)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/TestController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/TestController.scala 
b/s2rest_play/app/controllers/TestController.scala
deleted file mode 100644
index 8558ae5..0000000
--- a/s2rest_play/app/controllers/TestController.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package controllers
-
-import play.api.mvc.{Action, Controller}
-import util.TestDataLoader
-
-import scala.concurrent.Future
-
-
-object TestController extends Controller {
-  import ApplicationController._
-
-  def getRandomId() = withHeader(parse.anyContent) { request =>
-    val id = TestDataLoader.randomId
-    Ok(s"${id}")
-  }
-
-  def pingAsync() = Action.async(parse.json) { request =>
-    Future.successful(Ok("Pong\n"))
-  }
-
-  def ping() = Action(parse.json) { request =>
-    Ok("Pong\n")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/VertexController.scala 
b/s2rest_play/app/controllers/VertexController.scala
deleted file mode 100644
index 977c2fc..0000000
--- a/s2rest_play/app/controllers/VertexController.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package controllers
-
-
-import actors.QueueActor
-import com.kakao.s2graph.core.rest.RequestParser
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{ExceptionHandler, Graph, GraphExceptions}
-import config.Config
-import play.api.libs.json.{JsValue, Json}
-import play.api.mvc.{Controller, Result}
-
-import scala.concurrent.Future
-
-object VertexController extends Controller {
-  private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
-  private val requestParser: RequestParser = 
com.kakao.s2graph.rest.Global.s2parser
-
-  import ExceptionHandler._
-  import controllers.ApplicationController._
-  import play.api.libs.concurrent.Execution.Implicits._
-
-  def tryMutates(jsValue: JsValue, operation: String, serviceNameOpt: 
Option[String] = None, columnNameOpt: Option[String] = None, withWait: Boolean 
= false): Future[Result] = {
-    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-    else {
-      try {
-        val vertices = requestParser.toVertices(jsValue, operation, 
serviceNameOpt, columnNameOpt)
-
-        for (vertex <- vertices) {
-          if (vertex.isAsync)
-            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, 
None))
-          else
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
vertex, None))
-        }
-
-        //FIXME:
-        val verticesToStore = vertices.filterNot(v => v.isAsync)
-
-        if (withWait) {
-          val rets = s2.mutateVertices(verticesToStore, withWait = true)
-          rets.map(Json.toJson(_)).map(jsonResponse(_))
-        } else {
-          val rets = verticesToStore.map { vertex => QueueActor.router ! 
vertex; true }
-          Future.successful(jsonResponse(Json.toJson(rets)))
-        }
-      } catch {
-        case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"e"))
-        case e: Exception =>
-          logger.error(s"[Failed] tryMutates", e)
-          Future.successful(InternalServerError(s"${e.getStackTrace}"))
-      }
-    }
-  }
-
-  def inserts() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert")
-  }
-
-  def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert", withWait = true)
-  }
-
-  def insertsSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert", Some(serviceName), Some(columnName))
-  }
-
-  def deletes() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete")
-  }
-
-  def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete", withWait = true)
-  }
-
-  def deletesSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete", Some(serviceName), Some(columnName))
-  }
-
-  def deletesAll() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "deleteAll")
-  }
-
-  def deletesAllSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "deleteAll", Some(serviceName), Some(columnName))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/models/ExactCounterItem.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/models/ExactCounterItem.scala 
b/s2rest_play/app/models/ExactCounterItem.scala
deleted file mode 100644
index 244c046..0000000
--- a/s2rest_play/app/models/ExactCounterItem.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-package models
-
-import play.api.libs.json.{Json, Writes}
-
-/**
- * Created by alec on 15. 4. 15..
- */
-case class ExactCounterItem(ts: Long, count: Long, score: Double)
-
-case class ExactCounterIntervalItem(interval: String, dimension: Map[String, 
String], counter: Seq[ExactCounterItem])
-
-case class ExactCounterResultMeta(service: String, action: String, item: 
String)
-
-case class ExactCounterResult(meta: ExactCounterResultMeta, data: 
Seq[ExactCounterIntervalItem])
-
-object ExactCounterItem {
-  implicit val writes = new Writes[ExactCounterItem] {
-    def writes(item: ExactCounterItem) = Json.obj(
-      "ts" -> item.ts,
-      "time" -> tsFormat.format(item.ts),
-      "count" -> item.count,
-      "score" -> item.score
-    )
-  }
-  implicit val reads = Json.reads[ExactCounterItem]
-}
-
-object ExactCounterIntervalItem {
-  implicit val format = Json.format[ExactCounterIntervalItem]
-}
-
-object ExactCounterResultMeta {
-  implicit val format = Json.format[ExactCounterResultMeta]
-}
-
-object ExactCounterResult {
-  implicit val formats = Json.format[ExactCounterResult]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/models/RankCounterItem.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/models/RankCounterItem.scala 
b/s2rest_play/app/models/RankCounterItem.scala
deleted file mode 100644
index aaa7df7..0000000
--- a/s2rest_play/app/models/RankCounterItem.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package models
-
-import play.api.libs.json.{Json, Writes}
-
-/**
- * Created by alec on 15. 4. 15..
- */
-case class RankCounterItem(rank: Int, id: String, score: Double)
-
-case class RankCounterDimensionItem(interval: String, ts: Long, dimension: 
String, total: Double, ranks: Seq[RankCounterItem])
-
-case class RankCounterResultMeta(service: String, action: String)
-
-case class RankCounterResult(meta: RankCounterResultMeta, data: 
Seq[RankCounterDimensionItem])
-
-object RankCounterItem {
-  implicit val format = Json.format[RankCounterItem]
-}
-
-object RankCounterDimensionItem {
-  implicit val writes = new Writes[RankCounterDimensionItem] {
-    def writes(item: RankCounterDimensionItem) = Json.obj(
-      "interval" -> item.interval,
-      "ts" -> item.ts,
-      "time" -> tsFormat.format(item.ts),
-      "dimension" -> item.dimension,
-      "total" -> item.total,
-      "ranks" -> item.ranks
-    )
-  }
-  implicit val reads = Json.reads[RankCounterDimensionItem]
-}
-
-object RankCounterResultMeta {
-  implicit val format = Json.format[RankCounterResultMeta]
-}
-
-object RankCounterResult {
-  implicit val format = Json.format[RankCounterResult]
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/models/package.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/models/package.scala 
b/s2rest_play/app/models/package.scala
deleted file mode 100644
index 17fa8e1..0000000
--- a/s2rest_play/app/models/package.scala
+++ /dev/null
@@ -1,8 +0,0 @@
-import java.text.SimpleDateFormat
-
-/**
- * Created by alec on 15. 4. 20..
- */
-package object models {
-  def tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
new file mode 100644
index 0000000..8474b88
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -0,0 +1,81 @@
+package org.apache.s2graph.rest.play
+
+import java.util.concurrent.Executors
+
+import org.apache.s2graph.core.rest.{RequestParser, RestHandler}
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{ExceptionHandler, Graph, Management}
+import org.apache.s2graph.rest.play.actors.QueueActor
+import org.apache.s2graph.rest.play.config.Config
+import org.apache.s2graph.rest.play.controllers.ApplicationController
+import play.api.Application
+import play.api.mvc.{WithFilters, _}
+import play.filters.gzip.GzipFilter
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.io.Source
+import scala.util.Try
+
+object Global extends WithFilters(new GzipFilter()) {
+  var s2graph: Graph = _
+  var storageManagement: Management = _
+  var s2parser: RequestParser = _
+  var s2rest: RestHandler = _
+
+  // Application entry point
+  override def onStart(app: Application) {
+    ApplicationController.isHealthy = false
+
+    val numOfThread = Runtime.getRuntime.availableProcessors()
+    val threadPool = Executors.newFixedThreadPool(numOfThread)
+    val ec = ExecutionContext.fromExecutor(threadPool)
+
+    val config = Config.conf.underlying
+
+    // init s2graph with config
+    s2graph = new Graph(config)(ec)
+    storageManagement = new Management(s2graph)
+    s2parser = new RequestParser(s2graph.config) // merged config
+    s2rest = new RestHandler(s2graph)(ec)
+
+    QueueActor.init(s2graph)
+
+    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
+      ExceptionHandler.apply(config)
+    }
+
+    val defaultHealthOn = 
Config.conf.getBoolean("app.health.on").getOrElse(true)
+    ApplicationController.deployInfo = 
Try(Source.fromFile("./release_info").mkString("")).recover { case _ => 
"release info not found\n" }.get
+
+    ApplicationController.isHealthy = defaultHealthOn
+    logger.info(s"starts with num of thread: $numOfThread, 
${threadPool.getClass.getSimpleName}")
+  }
+
+  override def onStop(app: Application) {
+    QueueActor.shutdown()
+
+    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
+      ExceptionHandler.shutdown()
+    }
+
+    /**
+     * shutdown hbase client for flush buffers.
+     */
+    s2graph.shutdown()
+  }
+
+  override def onError(request: RequestHeader, ex: Throwable): Future[Result] 
= {
+    logger.error(s"onError => ip:${request.remoteAddress}, 
request:${request}", ex)
+    Future.successful(Results.InternalServerError)
+  }
+
+  override def onHandlerNotFound(request: RequestHeader): Future[Result] = {
+    logger.error(s"onHandlerNotFound => ip:${request.remoteAddress}, 
request:${request}")
+    Future.successful(Results.NotFound)
+  }
+
+  override def onBadRequest(request: RequestHeader, error: String): 
Future[Result] = {
+    logger.error(s"onBadRequest => ip:${request.remoteAddress}, 
request:$request, error:$error")
+    Future.successful(Results.BadRequest(error))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
new file mode 100644
index 0000000..2559fd1
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
@@ -0,0 +1,89 @@
+package org.apache.s2graph.rest.play.actors
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import org.apache.s2graph.core.ExceptionHandler._
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphElement}
+import org.apache.s2graph.rest.play.actors.Protocol.FlushAll
+import org.apache.s2graph.rest.play.config.Config
+import play.api.Play.current
+import play.api.libs.concurrent.Akka
+
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+
+object Protocol {
+
+  case object Flush
+
+  case object FlushAll
+
+}
+
+object QueueActor {
+  /** we are throttling down here so fixed number of actor to constant */
+  var router: ActorRef = _
+
+  //    Akka.system.actorOf(props(), name = "queueActor")
+  def init(s2: Graph) = {
+    router = Akka.system.actorOf(props(s2))
+  }
+
+  def shutdown() = {
+    router ! FlushAll
+    Akka.system.shutdown()
+    Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2)
+  }
+
+  def props(s2: Graph): Props = Props(classOf[QueueActor], s2)
+}
+
+class QueueActor(s2: Graph) extends Actor with ActorLogging {
+
+  import Protocol._
+
+  implicit val ec = context.system.dispatcher
+  //  logger.error(s"QueueActor: $self")
+  val queue = mutable.Queue.empty[GraphElement]
+  var queueSize = 0L
+  val maxQueueSize = Config.LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE
+  val timeUnitInMillis = 10
+  val rateLimitTimeStep = 1000 / timeUnitInMillis
+  val rateLimit = Config.LOCAL_QUEUE_ACTOR_RATE_LIMIT / rateLimitTimeStep
+
+
+  context.system.scheduler.schedule(Duration.Zero, Duration(timeUnitInMillis, 
TimeUnit.MILLISECONDS), self, Flush)
+
+  override def receive: Receive = {
+    case element: GraphElement =>
+
+      if (queueSize > maxQueueSize) {
+        ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, 
element, None))
+      } else {
+        queueSize += 1L
+        queue.enqueue(element)
+      }
+
+    case Flush =>
+      val elementsToFlush =
+        if (queue.size < rateLimit) queue.dequeueAll(_ => true)
+        else (0 until rateLimit).map(_ => queue.dequeue())
+
+      val flushSize = elementsToFlush.size
+
+      queueSize -= elementsToFlush.length
+      s2.mutateElements(elementsToFlush)
+
+      if (flushSize > 0) {
+        logger.info(s"flush: $flushSize, $queueSize")
+      }
+
+    case FlushAll =>
+      s2.mutateElements(queue)
+      context.stop(self)
+
+    case _ => logger.error("unknown protocol")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
new file mode 100644
index 0000000..0254506
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/Config.scala
@@ -0,0 +1,41 @@
+package org.apache.s2graph.rest.play.config
+
+import play.api.Play
+
+object Config {
+  // HBASE
+  lazy val HBASE_ZOOKEEPER_QUORUM = 
conf.getString("hbase.zookeeper.quorum").getOrElse("localhost")
+
+
+  // HBASE CLIENT
+  lazy val ASYNC_HBASE_CLIENT_FLUSH_INTERVAL = 
conf.getInt("async.hbase.client.flush.interval").getOrElse(1000).toShort
+  lazy val RPC_TIMEOUT = 
conf.getInt("hbase.client.operation.timeout").getOrElse(1000)
+  lazy val MAX_ATTEMPT = 
conf.getInt("hbase.client.operation.maxAttempt").getOrElse(3)
+
+  // PHASE
+  lazy val PHASE = conf.getString("phase").getOrElse("dev")
+  lazy val conf = Play.current.configuration
+
+  // CACHE
+  lazy val CACHE_TTL_SECONDS = conf.getInt("cache.ttl.seconds").getOrElse(600)
+  lazy val CACHE_MAX_SIZE = conf.getInt("cache.max.size").getOrElse(10000)
+
+  //KAFKA
+  lazy val KAFKA_METADATA_BROKER_LIST = 
conf.getString("kafka.metadata.broker.list").getOrElse("localhost")
+  lazy val KAFKA_PRODUCER_POOL_SIZE = 
conf.getInt("kafka.producer.pool.size").getOrElse(0)
+  lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}"
+  lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async"
+  lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed"
+
+  // is query or write
+  lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true)
+  lazy val IS_WRITE_SERVER = conf.getBoolean("is.write.server").getOrElse(true)
+
+
+  // query limit per step
+  lazy val QUERY_HARD_LIMIT = conf.getInt("query.hard.limit").getOrElse(300)
+
+  // local queue actor
+  lazy val LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE = 
conf.getInt("local.queue.actor.max.queue.size").getOrElse(10000)
+  lazy val LOCAL_QUEUE_ACTOR_RATE_LIMIT = 
conf.getInt("local.queue.actor.rate.limit").getOrElse(1000)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala
new file mode 100644
index 0000000..0bcdd81
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/config/CounterConfig.scala
@@ -0,0 +1,10 @@
+package org.apache.s2graph.rest.play.config
+
+/**
+ * Created by hsleep([email protected]) on 15. 9. 3..
+ */
+object CounterConfig {
+  // kafka
+  lazy val KAFKA_TOPIC_COUNTER = s"s2counter-${Config.PHASE}"
+  lazy val KAFKA_TOPIC_COUNTER_TRX = s"s2counter-trx-${Config.PHASE}"
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
new file mode 100644
index 0000000..00e0741
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -0,0 +1,424 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.rest.RequestParser
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.functional.syntax._
+import play.api.libs.json._
+import play.api.mvc
+import play.api.mvc.{Action, Controller}
+
+import scala.util.{Failure, Success, Try}
+
+object AdminController extends Controller {
+
+  import ApplicationController._
+  private val management: Management = 
org.apache.s2graph.rest.play.Global.storageManagement
+  private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
+
+  /**
+   * admin message formatter
+   * @tparam T
+   */
+  trait AdminMessageFormatter[T] {
+    def toJson(msg: T): JsValue
+  }
+
+  object AdminMessageFormatter {
+    implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] {
+      def toJson(js: T) = js
+    }
+
+    implicit val stringToJson = new AdminMessageFormatter[String] {
+      def toJson(js: String) = Json.obj("message" -> js)
+    }
+  }
+
+  def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: 
T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    f(formatter.toJson(message))
+  }
+
+  /**
+   * ok response
+   * @param message
+   * @tparam T
+   * @return
+   */
+  def ok[T: AdminMessageFormatter](message: T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    Ok(formatter.toJson(message)).as(applicationJsonHeader)
+  }
+
+  /**
+   * bad request response
+   * @param message
+   * @tparam T
+   * @return
+   */
+  def bad[T: AdminMessageFormatter](message: T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    BadRequest(formatter.toJson(message)).as(applicationJsonHeader)
+  }
+
+  /**
+   * not found response
+   * @param message
+   * @tparam T
+   * @return
+   */
+  def notFound[T: AdminMessageFormatter](message: T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    NotFound(formatter.toJson(message)).as(applicationJsonHeader)
+  }
+
+  private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: 
Try[T])(callback: T => R): mvc.Result = res match {
+    case Success(m) =>
+      val ret = callback(m)
+      logger.info(ret.toString)
+      ok(ret)
+    case Failure(error) =>
+      logger.error(error.getMessage, error)
+      error match {
+        case JsResultException(e) => bad(JsError.toFlatJson(e))
+        case _ => bad(error.getMessage)
+      }
+  }
+
+  def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T 
=> R): mvc.Result = res match {
+    case Some(m) => ok(callback(m))
+    case None => notFound("not found")
+  }
+
+  /**
+   * load all model cache
+   * @return
+   */
+  def loadCache() = Action { request =>
+    val startTs = System.currentTimeMillis()
+
+    if (!ApplicationController.isHealthy) {
+      loadCacheInner()
+    }
+
+    ok(s"${System.currentTimeMillis() - startTs}")
+  }
+
+  def loadCacheInner() = {
+    Service.findAll()
+    ServiceColumn.findAll()
+    Label.findAll()
+    LabelMeta.findAll()
+    LabelIndex.findAll()
+    ColumnMeta.findAll()
+  }
+
+  /**
+   * read
+   */
+
+  /**
+   * get service info
+   * @param serviceName
+   * @return
+   */
+  def getService(serviceName: String) = Action { request =>
+    val serviceOpt = Management.findService(serviceName)
+    optionResponse(serviceOpt)(_.toJson)
+  }
+
+  /**
+   * get label info
+   * @param labelName
+   * @return
+   */
+  def getLabel(labelName: String) = Action { request =>
+    val labelOpt = Management.findLabel(labelName)
+    optionResponse(labelOpt)(_.toJson)
+  }
+
+  /**
+   * get all labels of service
+   * @param serviceName
+   * @return
+   */
+  def getLabels(serviceName: String) = Action { request =>
+    Service.findByName(serviceName) match {
+      case None => notFound(s"Service $serviceName not found")
+      case Some(service) =>
+        val src = Label.findBySrcServiceId(service.id.get)
+        val tgt = Label.findByTgtServiceId(service.id.get)
+
+        ok(Json.obj("from" -> src.map(_.toJson), "to" -> tgt.map(_.toJson)))
+    }
+  }
+
+  /**
+   * get service columns
+   * @param serviceName
+   * @param columnName
+   * @return
+   */
+  def getServiceColumn(serviceName: String, columnName: String) = Action { 
request =>
+    val serviceColumnOpt = for {
+      service <- Service.findByName(serviceName)
+      serviceColumn <- ServiceColumn.find(service.id.get, columnName, useCache 
= false)
+    } yield serviceColumn
+
+    optionResponse(serviceColumnOpt)(_.toJson)
+  }
+
+  /**
+   * create
+   */
+
+  /**
+   * create service
+   * @return
+   */
+  def createService() = Action(parse.json) { request =>
+    val serviceTry = createServiceInner(request.body)
+    tryResponse(serviceTry)(_.toJson)
+  }
+
+
+  def createServiceInner(jsValue: JsValue) = {
+    val (serviceName, cluster, tableName, preSplitSize, ttl, 
compressionAlgorithm) = requestParser.toServiceElements(jsValue)
+    management.createService(serviceName, cluster, tableName, preSplitSize, 
ttl, compressionAlgorithm)
+  }
+
+  /**
+   * create label
+   * @return
+   */
+  def createLabel() = Action(parse.json) { request =>
+    val ret = createLabelInner(request.body)
+    tryResponse(ret)(_.toJson)
+  }
+
+
+  def createLabelInner(json: JsValue) = for {
+    labelArgs <- requestParser.toLabelElements(json)
+    label <- (management.createLabel _).tupled(labelArgs)
+  } yield label
+
+  /**
+   * add index
+   * @return
+   */
+  def addIndex() = Action(parse.json) { request =>
+    val ret = addIndexInner(request.body)
+    tryResponse(ret)(_.label + " is updated")
+  }
+
+  def addIndexInner(json: JsValue) = for {
+    (labelName, indices) <- requestParser.toIndexElements(json)
+    label <- Management.addIndex(labelName, indices)
+  } yield label
+
+  /**
+   * create service column
+   * @return
+   */
+  def createServiceColumn() = Action(parse.json) { request =>
+    val serviceColumnTry = createServiceColumnInner(request.body)
+    tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => 
Json.obj("metas" -> columns.map(_.toJson)) }
+  }
+
+  def createServiceColumnInner(jsValue: JsValue) = for {
+    (serviceName, columnName, columnType, props) <- 
requestParser.toServiceColumnElements(jsValue)
+    serviceColumn <- Management.createServiceColumn(serviceName, columnName, 
columnType, props)
+  } yield serviceColumn
+
+  /**
+   * delete
+   */
+
+  /**
+   * delete label
+   * @param labelName
+   * @return
+   */
+  def deleteLabel(labelName: String) = Action { request =>
+    val deleteLabelTry = deleteLabelInner(labelName)
+    tryResponse(deleteLabelTry)(labelName => labelName + " is deleted")
+  }
+
+  def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName)
+
+  /**
+   * delete servieColumn
+   * @param serviceName
+   * @param columnName
+   * @return
+   */
+  def deleteServiceColumn(serviceName: String, columnName: String) = Action { 
request =>
+    val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName)
+    tryResponse(serviceColumnTry)(columnName => columnName + " is deleted")
+  }
+
+  def deleteServiceColumnInner(serviceName: String, columnName: String) =
+    Management.deleteColumn(serviceName, columnName)
+
+  /**
+   * update
+   */
+
+  /**
+   * add Prop to label
+   * @param labelName
+   * @return
+   */
+  def addProp(labelName: String) = Action(parse.json) { request =>
+    val labelMetaTry = addPropInner(labelName, request.body)
+    tryResponse(labelMetaTry)(_.toJson)
+  }
+
+  def addPropInner(labelName: String, js: JsValue) = for {
+    prop <- requestParser.toPropElements(js)
+    labelMeta <- Management.addProp(labelName, prop)
+  } yield labelMeta
+
+  /**
+   * add prop to serviceColumn
+   * @param serviceName
+   * @param columnName
+   * @return
+   */
+  def addServiceColumnProp(serviceName: String, columnName: String) = 
Action(parse.json) { request =>
+    addServiceColumnPropInner(serviceName, columnName)(request.body) match {
+      case None => bad(s"can`t find service with $serviceName or can`t find 
serviceColumn with $columnName")
+      case Some(m) => Ok(m.toJson).as(applicationJsonHeader)
+    }
+  }
+
+  def addServiceColumnPropInner(serviceName: String, columnName: String)(js: 
JsValue) = {
+    for {
+      service <- Service.findByName(serviceName)
+      serviceColumn <- ServiceColumn.find(service.id.get, columnName)
+      prop <- requestParser.toPropElements(js).toOption
+    } yield {
+      ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, 
prop.defaultValue)
+    }
+  }
+
+  /**
+   * add props to serviceColumn
+   * @param serviecName
+   * @param columnName
+   * @return
+   */
+  def addServiceColumnProps(serviecName: String, columnName: String) = 
Action(parse.json) { request =>
+    val jsObjs = 
request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject])
+    val newProps = for {
+      js <- jsObjs
+      newProp <- addServiceColumnPropInner(serviecName, columnName)(js)
+    } yield newProp
+    ok(s"${newProps.size} is added.")
+  }
+
+  /**
+   * copy label
+   * @param oldLabelName
+   * @param newLabelName
+   * @return
+   */
+  def copyLabel(oldLabelName: String, newLabelName: String) = Action { request 
=>
+    val copyTry = management.copyLabel(oldLabelName, newLabelName, 
Some(newLabelName))
+    tryResponse(copyTry)(_.label + "created")
+  }
+
+  /**
+   * rename label
+   * @param oldLabelName
+   * @param newLabelName
+   * @return
+   */
+  def renameLabel(oldLabelName: String, newLabelName: String) = Action { 
request =>
+    Label.findByName(oldLabelName) match {
+      case None => NotFound.as(applicationJsonHeader)
+      case Some(label) =>
+        Management.updateLabelName(oldLabelName, newLabelName)
+        ok(s"Label was updated")
+    }
+  }
+
+  /**
+   * update HTable for a label
+   * @param labelName
+   * @param newHTableName
+   * @return
+   */
+  def updateHTable(labelName: String, newHTableName: String) = Action { 
request =>
+    val updateTry = Management.updateHTable(labelName, newHTableName)
+    tryResponse(updateTry)(_.toString + " label(s) updated.")
+  }
+
+
+  case class HTableParams(cluster: String, hTableName: String,
+    preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: 
Option[String]) {
+
+    override def toString(): String = {
+      s"""HtableParams
+         |-- cluster : $cluster
+         |-- hTableName : $hTableName
+         |-- preSplitSize : $preSplitSize
+         |-- hTableTTL : $hTableTTL
+         |-- compressionAlgorithm : $compressionAlgorithm
+         |""".stripMargin
+    }
+  }
+
+  implicit object HTableParamsJsonConverter extends Format[HTableParams] {
+    def reads(json: JsValue): JsResult[HTableParams] = (
+    (__ \ "cluster").read[String] and
+    (__ \ "hTableName").read[String] and
+    (__ \ "preSplitSize").read[Int] and
+    (__ \ "hTableTTL").readNullable[Int] and
+      (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply 
_).reads(json)
+
+    def writes(o: HTableParams): JsValue = Json.obj(
+      "cluster" -> o.cluster,
+      "hTableName" -> o.hTableName,
+      "preSplitSize" -> o.preSplitSize,
+      "hTableTTL" -> o.hTableTTL,
+      "compressionAlgorithm" -> o.compressionAlgorithm
+    )
+  }
+
+  implicit object JsErrorJsonWriter extends Writes[JsError] {
+    def writes(o: JsError): JsValue = Json.obj(
+      "errors" -> JsArray(
+        o.errors.map {
+          case (path, validationErrors) => Json.obj(
+            "path" -> Json.toJson(path.toString()),
+            "validationErrors" -> JsArray(validationErrors.map(validationError 
=> Json.obj(
+              "message" -> JsString(validationError.message),
+              "args" -> JsArray(validationError.args.map(_ match {
+                case x: Int => JsNumber(x)
+                case x => JsString(x.toString)
+              }))
+            )))
+          )
+        }
+      )
+    )
+  }
+
+  def createHTable() = Action { request =>
+
+    //    Management.createTable(cluster, hTableName, List("e", "v"), 
preSplitSize, hTableTTL, compressionAlgorithm)
+    request.body.asJson.map(_.validate[HTableParams] match {
+      case JsSuccess(hTableParams, _) => {
+        management.createTable(hTableParams.cluster, hTableParams.hTableName, 
List("e", "v"),
+          hTableParams.preSplitSize, hTableParams.hTableTTL,
+          
hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
+        logger.info(hTableParams.toString())
+        ok(s"HTable was created.")
+      }
+      case err@JsError(_) => bad(Json.toJson(err))
+    }).getOrElse(bad("Invalid Json."))
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
new file mode 100644
index 0000000..3482f6d
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ApplicationController.scala
@@ -0,0 +1,101 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.PostProcess
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.iteratee.Enumerator
+import play.api.libs.json.{JsString, JsValue}
+import play.api.mvc._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object ApplicationController extends Controller {
+
+  var isHealthy = true
+  var deployInfo = ""
+  val applicationJsonHeader = "application/json"
+
+  val jsonParser: BodyParser[JsValue] = s2parse.json
+
+  val jsonText: BodyParser[String] = s2parse.jsonText
+
+  private def badQueryExceptionResults(ex: Exception) =
+    
Future.successful(BadRequest(PostProcess.badRequestResults(ex)).as(applicationJsonHeader))
+
+  private def errorResults =
+    Future.successful(Ok(PostProcess.emptyResults).as(applicationJsonHeader))
+
+  def requestFallback(body: String): PartialFunction[Throwable, 
Future[Result]] = {
+    case e: BadQueryException =>
+      logger.error(s"{$body}, ${e.getMessage}", e)
+      badQueryExceptionResults(e)
+    case e: Exception =>
+      logger.error(s"${body}, ${e.getMessage}", e)
+      errorResults
+  }
+
+  def updateHealthCheck(isHealthy: Boolean) = Action { request =>
+    this.isHealthy = isHealthy
+    Ok(this.isHealthy + "\n")
+  }
+
+  def healthCheck() = withHeader(parse.anyContent) { request =>
+    if (isHealthy) Ok(deployInfo)
+    else NotFound
+  }
+
+  def jsonResponse(json: JsValue, headers: (String, String)*) =
+    if (ApplicationController.isHealthy) {
+      Ok(json).as(applicationJsonHeader).withHeaders(headers: _*)
+    } else {
+      Result(
+        header = ResponseHeader(OK),
+        body = Enumerator(json.toString.getBytes()),
+        connection = HttpConnection.Close
+      ).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: 
headers: _*)
+    }
+
+  def toLogMessage[A](request: Request[A], result: Result)(startedAt: Long): 
String = {
+    val duration = System.currentTimeMillis() - startedAt
+    val isQueryRequest = result.header.headers.contains("result_size")
+    val resultSize = result.header.headers.getOrElse("result_size", "-1")
+
+    try {
+      val body = request.body match {
+        case AnyContentAsJson(jsValue) => jsValue match {
+          case JsString(str) => str
+          case _ => jsValue.toString
+        }
+        case AnyContentAsEmpty => ""
+        case _ => request.body.toString
+      }
+
+      val str =
+        if (isQueryRequest)
+          s"${request.method} ${request.uri} took ${duration} ms 
${result.header.status} ${resultSize} ${body}"
+        else
+          s"${request.method} ${request.uri} took ${duration} ms 
${result.header.status} ${resultSize} ${body}"
+
+      str
+    } finally {
+      /* pass */
+    }
+  }
+
+  def withHeaderAsync[A](bodyParser: BodyParser[A])(block: Request[A] => 
Future[Result])(implicit ex: ExecutionContext) =
+    Action.async(bodyParser) { request =>
+      val startedAt = System.currentTimeMillis()
+      block(request).map { r =>
+        logger.info(toLogMessage(request, r)(startedAt))
+        r
+      }
+    }
+
+  def withHeader[A](bodyParser: BodyParser[A])(block: Request[A] => Result) =
+    Action(bodyParser) { request: Request[A] =>
+      val startedAt = System.currentTimeMillis()
+      val r = block(request)
+      logger.info(toLogMessage(request, r)(startedAt))
+      r
+    }
+}

Reply via email to