http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
new file mode 100644
index 0000000..4587d59
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -0,0 +1,744 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.s2graph.core.ExceptionHandler
+import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.counter
+import org.apache.s2graph.counter.config.S2CounterConfig
+import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
+import org.apache.s2graph.counter.core._
+import org.apache.s2graph.counter.core.v1.{RankingStorageRedis, 
ExactStorageAsyncHBase}
+import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, 
ExactStorageGraph}
+import org.apache.s2graph.counter.models.Counter.ItemType
+import org.apache.s2graph.counter.models.{Counter, CounterModel}
+import org.apache.s2graph.counter.util.{ReduceMapValue, CartesianProduct, 
UnitConverter}
+import org.apache.s2graph.rest.play.config.CounterConfig
+import org.apache.s2graph.rest.play.models._
+import play.api.Play
+import play.api.libs.json.Reads._
+import play.api.libs.json._
+import play.api.mvc.{Action, Controller, Request}
+import scala.concurrent.Future
+import scala.util.{Failure, Success, Try}
+
+object CounterController extends Controller {
+  import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+  val config = Play.current.configuration.underlying
+  val s2config = new S2CounterConfig(config)
+
+  private val exactCounterMap = Map(
+    counter.VERSION_1 -> new ExactCounter(config, new 
ExactStorageAsyncHBase(config)),
+    counter.VERSION_2 -> new ExactCounter(config, new 
ExactStorageGraph(config))
+  )
+  private val rankingCounterMap = Map(
+    counter.VERSION_1 -> new RankingCounter(config, new 
RankingStorageRedis(config)),
+    counter.VERSION_2 -> new RankingCounter(config, new 
RankingStorageGraph(config))
+  )
+
+  private val tablePrefixMap = Map (
+    counter.VERSION_1 -> "s2counter",
+    counter.VERSION_2 -> "s2counter_v2"
+  )
+
+  private def exactCounter(version: Byte): ExactCounter = 
exactCounterMap(version)
+  private def rankingCounter(version: Byte): RankingCounter = 
rankingCounterMap(version)
+
+  lazy val counterModel = new CounterModel(config)
+
+  def getQueryString[T](key: String, default: String)(implicit request: 
Request[T]): String = {
+    request.getQueryString(key).getOrElse(default)
+  }
+
+  implicit val counterWrites = new Writes[Counter] {
+    override def writes(o: Counter): JsValue = Json.obj(
+      "version" -> o.version.toInt,
+      "autoComb" -> o.autoComb,
+      "dimension" -> o.dimension,
+      "useProfile" -> o.useProfile,
+      "bucketImpId" -> o.bucketImpId,
+      "useRank" -> o.useRank,
+      "intervalUnit" -> o.intervalUnit,
+      "ttl" -> o.ttl,
+      "dailyTtl" -> o.dailyTtl,
+      "rateAction" -> o.rateActionId.flatMap { actionId =>
+        counterModel.findById(actionId, useCache = false).map { actionPolicy =>
+          Json.obj("service" -> actionPolicy.service, "action" -> 
actionPolicy.action)
+        }
+      },
+      "rateBase" -> o.rateBaseId.flatMap { baseId =>
+        counterModel.findById(baseId, useCache = false).map { basePolicy =>
+          Json.obj("service" -> basePolicy.service, "action" -> 
basePolicy.action)
+        }
+      },
+      "rateThreshold" -> o.rateThreshold
+    )
+  }
+
+  def createAction(service: String, action: String) = Action(s2parse.json) { 
implicit request =>
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case None =>
+        val body = request.body
+        val version = (body \ 
"version").asOpt[Int].map(_.toByte).getOrElse(counter.VERSION_2)
+        val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true)
+        val dimension = (body \ "dimension").asOpt[String].getOrElse("")
+        val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false)
+        val bucketImpId = (body \ "bucketImpId").asOpt[String]
+
+        val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true)
+        val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true)
+
+        val intervalUnit = (body \ "intervalUnit").asOpt[String]
+        // 2 day
+        val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60)
+        val dailyTtl = (body \ "dailyTtl").asOpt[Int]
+        val regionMultiplier = (body \ 
"regionMultiplier").asOpt[Int].getOrElse(1)
+
+        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
+        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
+        val rateThreshold = (body \ "rateThreshold").asOpt[Int]
+
+        val rateActionId = {
+          for {
+            actionMap <- rateAction
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action)
+          } yield {
+            policy.id
+          }
+        }
+        val rateBaseId = {
+          for {
+            actionMap <- rateBase
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action)
+          } yield {
+            policy.id
+          }
+        }
+
+        val hbaseTable = {
+          Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_"
+        }
+
+        // find label
+        val itemType = Label.findByName(action, useCache = false) match {
+          case Some(label) =>
+            ItemType.withName(label.tgtColumnType.toUpperCase)
+          case None =>
+            val strItemType = (body \ 
"itemType").asOpt[String].getOrElse("STRING")
+            ItemType.withName(strItemType.toUpperCase)
+        }
+        val policy = Counter(useFlag = true, version, service, action, 
itemType, autoComb = autoComb, dimension,
+          useProfile = useProfile, bucketImpId, useRank = useRank, ttl, 
dailyTtl, Some(hbaseTable), intervalUnit,
+          rateActionId, rateBaseId, rateThreshold)
+
+        // prepare exact storage
+        exactCounter(version).prepare(policy)
+        if (useRank) {
+          // prepare ranking storage
+          rankingCounter(version).prepare(policy)
+        }
+        counterModel.createServiceAction(policy)
+        Ok(Json.toJson(Map("msg" -> s"created $service/$action")))
+      case Some(policy) =>
+        Ok(Json.toJson(Map("msg" -> s"already exist $service/$action")))
+    }
+  }
+
+  def getAction(service: String, action: String) = Action { request =>
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case Some(policy) =>
+        Ok(Json.toJson(policy))
+      case None =>
+        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
+    }
+  }
+
+  def updateAction(service: String, action: String) = Action(s2parse.json) { 
request =>
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case Some(oldPolicy) =>
+        val body = request.body
+        val autoComb = (body \ 
"autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb)
+        val dimension = (body \ 
"dimension").asOpt[String].getOrElse(oldPolicy.dimension)
+        val useProfile = (body \ 
"useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile)
+        val bucketImpId = (body \ "bucketImpId").asOpt[String] match {
+          case Some(s) => Some(s)
+          case None => oldPolicy.bucketImpId
+        }
+
+        val useRank = (body \ 
"useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank)
+
+        val intervalUnit = (body \ "intervalUnit").asOpt[String] match {
+          case Some(s) => Some(s)
+          case None => oldPolicy.intervalUnit
+        }
+
+        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
+        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
+        val rateThreshold = (body \ "rateThreshold").asOpt[Int] match {
+          case Some(i) => Some(i)
+          case None => oldPolicy.rateThreshold
+        }
+
+        val rateActionId = {
+          for {
+            actionMap <- rateAction
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action, 
useCache = false)
+          } yield {
+            policy.id
+          }
+        } match {
+          case Some(i) => Some(i)
+          case None => oldPolicy.rateActionId
+        }
+        val rateBaseId = {
+          for {
+            actionMap <- rateBase
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action, 
useCache = false)
+          } yield {
+            policy.id
+          }
+        } match {
+          case Some(i) => Some(i)
+          case None => oldPolicy.rateBaseId
+        }
+
+        // new counter
+        val policy = Counter(id = oldPolicy.id, useFlag = oldPolicy.useFlag, 
oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, 
dimension,
+          useProfile = useProfile, bucketImpId, useRank = useRank, 
oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit,
+          rateActionId, rateBaseId, rateThreshold)
+
+        counterModel.updateServiceAction(policy)
+        Ok(Json.toJson(Map("msg" -> s"updated $service/$action")))
+      case None =>
+        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
+    }
+  }
+
+  def prepareAction(service: String, action: String) = Action(s2parse.json) { 
request =>
+    // for data migration
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case Some(policy) =>
+        val body = request.body
+        val version = (body \ "version").as[Int].toByte
+        if (version != policy.version) {
+          // change table name
+          val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) 
++ policy.dailyTtl mkString "_"
+          val newPolicy = policy.copy(version = version, hbaseTable = 
Some(newTableName))
+          exactCounter(version).prepare(newPolicy)
+          if (newPolicy.useRank) {
+            rankingCounter(version).prepare(newPolicy)
+          }
+          Ok(Json.toJson(Map("msg" -> s"prepare storage v$version 
$service/$action")))
+        } else {
+          Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version 
$service/$action")))
+        }
+      case None =>
+        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
+    }
+  }
+
+  def deleteAction(service: String, action: String) = Action.apply {
+    {
+      for {
+        policy <- counterModel.findByServiceAction(service, action, useCache = 
false)
+      } yield {
+        Try {
+          exactCounter(policy.version).destroy(policy)
+          if (policy.useRank) {
+            rankingCounter(policy.version).destroy(policy)
+          }
+          counterModel.deleteServiceAction(policy)
+        } match {
+          case Success(v) =>
+            Ok(Json.toJson(Map("msg" -> s"deleted $service/$action")))
+          case Failure(ex) =>
+            throw ex
+        }
+      }
+    }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not 
found"))))
+  }
+
+  def getExactCountAsync(service: String, action: String, item: String) = 
Action.async { implicit request =>
+    val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
+      .map(IntervalUnit.withName)
+    val limit = getQueryString("limit", "1").toInt
+
+    val qsSum = request.getQueryString("sum")
+
+    val optFrom = 
request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
+    val optTo = 
request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
+
+    val limitOpt = (optFrom, optTo) match {
+      case (Some(_), Some(_)) =>
+        None
+      case _ =>
+        Some(limit)
+    }
+
+    // find dimension
+    lazy val dimQueryValues = request.queryString.filterKeys { k => 
k.charAt(0) == ':' }.map { case (k, v) =>
+      (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet)
+    }
+//    Logger.warn(s"$dimQueryValues")
+
+    counterModel.findByServiceAction(service, action) match {
+      case Some(policy) =>
+        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optFrom, optTo)
+        val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
+        try {
+//          Logger.warn(s"$tqs $qsSum")
+          if (tqs.head.length > 1 && qsSum.isDefined) {
+            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, 
qsSum).map { jsVal =>
+              Ok(jsVal)
+            }
+          } else {
+            getExactCountToJs(policy, item, timeRange, limitOpt, 
dimQueryValues).map { jsVal =>
+              Ok(jsVal)
+            }
+          }
+        } catch {
+          case e: Exception =>
+            throw e
+//            Future.successful(BadRequest(s"$service, $action, $item"))
+        }
+      case None =>
+        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action 
not found"))))
+    }
+  }
+
+  /**
+   * [{
+   *    "service": , "action", "itemIds": [], "interval": string, "limit": 
int, "from": ts, "to": ts,
+   *    "dimensions": [{"key": list[String]}]
+   * }]
+   * @return
+   */
+  private def parseExactCountParam(jsValue: JsValue) = {
+    val service = (jsValue \ "service").as[String]
+    val action = (jsValue \ "action").as[String]
+    val itemIds = (jsValue \ "itemIds").as[Seq[String]]
+    val intervals = (jsValue \ 
"intervals").asOpt[Seq[String]].getOrElse(Seq("t")).distinct.map(IntervalUnit.withName)
+    val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1)
+    val from = (jsValue \ "from").asOpt[Long]
+    val to = (jsValue \ "to").asOpt[Long]
+    val sum = (jsValue \ "sum").asOpt[String]
+    val dimensions = {
+      for {
+        dimension <- (jsValue \ 
"dimensions").asOpt[Seq[JsObject]].getOrElse(Nil)
+        (k, vs) <- dimension.fields
+      } yield {
+        k -> vs.as[Seq[String]].toSet
+      }
+    }.toMap
+    (service, action, itemIds, intervals, limit, from, to, dimensions, sum)
+  }
+
+  def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request 
=>
+    val jsValue = request.body
+    try {
+      val futures = {
+        for {
+          jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil)
+          (service, action, itemIds, intervalUnits, limit, from, to, 
dimQueryValues, qsSum) = parseExactCountParam(jsObject)
+          optFrom = from.map(UnitConverter.toMillis)
+          optTo = to.map(UnitConverter.toMillis)
+          timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
+          policy <- counterModel.findByServiceAction(service, action).toSeq
+          item <- itemIds
+        } yield {
+          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optFrom, optTo)
+          val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
+          val limitOpt = (optFrom, optTo) match {
+            case (Some(_), Some(_)) =>
+              None
+            case _ =>
+              Some(limit)
+          }
+
+//          Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs")
+
+          if (tqs.head.length > 1 && qsSum.isDefined) {
+            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum)
+          } else {
+            getExactCountToJs(policy, item, timeRange, limitOpt, 
dimQueryValues)
+          }
+        }
+      }
+      Future.sequence(futures).map { rets =>
+        Ok(Json.toJson(rets))
+      }
+    } catch {
+      case e: Exception =>
+        throw e
+//        Future.successful(BadRequest(s"$jsValue"))
+    }
+  }
+
+  private [controllers] def fetchedToResult(fetchedCounts: 
FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = {
+    for {
+      ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap
+    } yield {
+      val counterItems = {
+        val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts }
+        val limited = limitOpt match {
+          case Some(limit) => sortedItems.take(limit)
+          case None => sortedItems
+        }
+        for {
+          (eq, value) <- limited
+        } yield {
+          ExactCounterItem(eq.tq.ts, value, value.toDouble)
+        }
+      }
+      ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems)
+    }
+  }.toSeq
+
+  private def decayedToResult(decayedCounts: DecayedCounts): 
Seq[ExactCounterIntervalItem] = {
+    for {
+      (eq, score) <- decayedCounts.qualifierWithCountMap
+    } yield {
+      ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, 
Seq(ExactCounterItem(eq.tq.ts, score.toLong, score)))
+    }
+  }.toSeq
+
+  private def getExactCountToJs(policy: Counter,
+                                item: String,
+                                timeRange: Seq[(TimedQualifier, 
TimedQualifier)],
+                                limitOpt: Option[Int],
+                                dimQueryValues: Map[String, Set[String]]): 
Future[JsValue] = {
+    exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, 
dimQueryValues).map { seq =>
+      val items = {
+        for {
+          fetched <- seq
+        } yield {
+          fetchedToResult(fetched, limitOpt)
+        }
+      }.flatten
+      Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, 
policy.action, item), items))
+    }
+  }
+
+  private def getDecayedCountToJs(policy: Counter,
+                                  item: String,
+                                  timeRange: Seq[(TimedQualifier, 
TimedQualifier)],
+                                  dimQueryValues: Map[String, Set[String]],
+                                  qsSum: Option[String]): Future[JsValue] = {
+    exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), 
timeRange, dimQueryValues, qsSum).map { seq =>
+      val decayedCounts = seq.head
+      val meta = ExactCounterResultMeta(policy.service, policy.action, 
decayedCounts.exactKey.itemKey)
+      val intervalItems = decayedToResult(decayedCounts)
+      Json.toJson(ExactCounterResult(meta, intervalItems))
+    }
+  }
+
+  def getRankingCountAsync(service: String, action: String) = Action.async { 
implicit request =>
+    lazy val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
+      .map(IntervalUnit.withName)
+    lazy val limit = getQueryString("limit", "1").toInt
+    lazy val kValue = getQueryString("k", "10").toInt
+
+    lazy val qsSum = request.getQueryString("sum")
+
+    lazy val optFrom = 
request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
+    lazy val optTo = 
request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
+
+    // find dimension
+    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) 
== ':' }.map { case (k, v) =>
+      (k.substring(1), v.mkString(",").split(',').toList)
+    }
+
+    val dimensions = {
+      for {
+        values <- CartesianProduct(dimensionMap.values.toList).toSeq
+      } yield {
+        dimensionMap.keys.zip(values).toMap
+      }
+    }
+
+    counterModel.findByServiceAction(service, action) match {
+      case Some(policy) =>
+        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optTo)
+        val dimKeys = {
+          for {
+            dimension <- dimensions
+          } yield {
+            dimension -> tqs.map(tq => RankingKey(policy.id, policy.version, 
ExactQualifier(tq, dimension)))
+          }
+        }
+
+        // if tqs has only 1 tq, do not apply sum function
+        try {
+          val rankResult = {
+            if (tqs.length > 1 && qsSum.isDefined) {
+              getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum)
+            } else {
+              // no summary
+              Future.successful(getRankCounterResult(policy, dimKeys, kValue))
+            }
+          }
+
+          rankResult.map { result =>
+            Ok(Json.toJson(result))
+          }
+        } catch {
+          case e: UnsupportedOperationException =>
+            Future.successful(NotImplemented(Json.toJson(
+              Map("msg" -> e.getMessage)
+            )))
+          case e: Throwable =>
+            throw e
+        }
+      case None =>
+        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action 
not found"))))
+    }
+  }
+
+  def deleteRankingCount(service: String, action: String) = Action.async { 
implicit request =>
+    lazy val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
+      .map(IntervalUnit.withName)
+    lazy val limit = getQueryString("limit", "1").toInt
+
+    // find dimension
+    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) 
== ':' }.map { case (k, v) =>
+      (k.substring(1), v.mkString(",").split(',').toList)
+    }
+
+    val dimensions = {
+      for {
+        values <- CartesianProduct(dimensionMap.values.toList).toSeq
+      } yield {
+        dimensionMap.keys.zip(values).toMap
+      }
+    }
+
+    Future {
+      counterModel.findByServiceAction(service, action) match {
+        case Some(policy) =>
+          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit)
+          val keys = {
+            for {
+              dimension <- dimensions
+              tq <- tqs
+            } yield {
+              RankingKey(policy.id, policy.version, ExactQualifier(tq, 
dimension))
+            }
+          }
+
+          for {
+            key <- keys
+          } {
+            rankingCounter(policy.version).delete(key)
+          }
+
+          Ok(JsObject(
+            Seq(
+              ("msg", Json.toJson(s"delete ranking in $service.$action")),
+              ("items", Json.toJson({
+                for {
+                  key <- keys
+                } yield {
+                  s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}"
+                }
+              }))
+            )
+          ))
+        case None =>
+          NotFound(Json.toJson(
+            Map("msg" -> s"$service.$action not found")
+          ))
+      }
+    }
+  }
+
+  val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, 
RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1))
+
+  // change format
+  private def getDecayedCountsAsync(policy: Counter,
+                                    items: Seq[String],
+                                    timeRange: (TimedQualifier, 
TimedQualifier),
+                                    dimension: Map[String, String],
+                                    qsSum: Option[String]): 
Future[Seq[(ExactKeyTrait, Double)]] = {
+    exactCounter(policy.version).getDecayedCountsAsync(policy, items, 
Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq =>
+      for {
+        DecayedCounts(exactKey, qcMap) <- seq
+        value <- qcMap.values
+      } yield {
+        exactKey -> value
+      }
+    }
+  }
+
+  def getSumRankCounterResultAsync(policy: Counter,
+                                   dimKeys: Seq[(Map[String, String], 
Seq[RankingKey])],
+                                   kValue: Int,
+                                   qsSum: Option[String]): 
Future[RankCounterResult] = {
+    val futures = {
+      for {
+        (dimension, keys) <- dimKeys
+      } yield {
+        val tqs = keys.map(rk => rk.eq.tq)
+        val (tqFrom, tqTo) = (tqs.last, tqs.head)
+        val items = rankingCounter(policy.version).getAllItems(keys, kValue)
+//        Logger.warn(s"item count: ${items.length}")
+        val future = {
+          if (policy.isRateCounter) {
+            val actionPolicy = 
policy.rateActionId.flatMap(counterModel.findById(_)).get
+            val basePolicy = 
policy.rateBaseId.flatMap(counterModel.findById(_)).get
+
+            val futureAction = getDecayedCountsAsync(actionPolicy, items, 
(tqFrom, tqTo), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(score, -1)
+              }.toMap
+            }
+            val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, 
tqTo), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(-1, score)
+              }.toMap
+            }
+            futureAction.zip(futureBase).map { case (actionScores, baseScores) 
=>
+              reduceRateRankingValue(actionScores, baseScores).map { case (k, 
rrv) =>
+//                Logger.warn(s"$k -> $rrv")
+                k -> rrv.rankingValue.score
+              }.toSeq
+            }
+          }
+          else if (policy.isTrendCounter) {
+            val actionPolicy = 
policy.rateActionId.flatMap(counterModel.findById(_)).get
+            val basePolicy = 
policy.rateBaseId.flatMap(counterModel.findById(_)).get
+
+            val futureAction = getDecayedCountsAsync(actionPolicy, items, 
(tqFrom, tqTo), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(score, -1)
+              }.toMap
+            }
+            val futureBase = getDecayedCountsAsync(basePolicy, items, 
(tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(-1, score)
+              }.toMap
+            }
+            futureAction.zip(futureBase).map { case (actionScores, baseScores) 
=>
+              reduceRateRankingValue(actionScores, baseScores).map { case (k, 
rrv) =>
+//                Logger.warn(s"$k -> $rrv")
+                k -> rrv.rankingValue.score
+              }.toSeq
+            }
+          }
+          else {
+            getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, 
qsSum)
+          }
+        }
+        future.map { keyWithScore =>
+          val ranking = keyWithScore.sortBy(-_._2).take(kValue)
+          val rankCounterItems = {
+            for {
+              idx <- ranking.indices
+              (exactKey, score) = ranking(idx)
+            } yield {
+              val realId = policy.itemType match {
+                case ItemType.BLOB => 
exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey)
+                  .getOrElse(throw new Exception(s"not found blob id. 
${policy.service}.${policy.action} ${exactKey.itemKey}"))
+                case _ => exactKey.itemKey
+              }
+              RankCounterItem(idx + 1, realId, score)
+            }
+          }
+
+          val eq = ExactQualifier(tqFrom, dimension)
+          RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, 
-1, rankCounterItems)
+        }
+      }
+    }
+
+    Future.sequence(futures).map { dimensionResultList =>
+      RankCounterResult(RankCounterResultMeta(policy.service, policy.action), 
dimensionResultList)
+    }
+  }
+
+  def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], 
Seq[RankingKey])], kValue: Int): RankCounterResult = {
+    val dimensionResultList = {
+      for {
+        (dimension, keys) <- dimKeys
+        key <- keys
+      } yield {
+        val rankingValue = rankingCounter(policy.version).getTopK(key, kValue)
+        val ranks = {
+          for {
+            rValue <- rankingValue.toSeq
+            idx <- rValue.values.indices
+            rank = idx + 1
+          } yield {
+            val (id, score) = rValue.values(idx)
+            val realId = policy.itemType match {
+              case ItemType.BLOB =>
+                exactCounter(policy.version)
+                  .getBlobValue(policy, id)
+                  .getOrElse(throw new Exception(s"not found blob id. 
${policy.service}.${policy.action} $id"))
+              case _ => id
+            }
+            RankCounterItem(rank, realId, score)
+          }
+        }
+        val eq = key.eq
+        val tq = eq.tq
+        RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, 
rankingValue.map(v => v.totalScore).getOrElse(0d), ranks)
+      }
+    }
+
+    RankCounterResult(RankCounterResultMeta(policy.service, policy.action), 
dimensionResultList)
+  }
+
+  type Record = ProducerRecord[String, String]
+
+  def incrementCount(service: String, action: String, item: String) = 
Action.async(s2parse.json) { request =>
+    Future {
+      /**
+       * {
+       * timestamp: Long
+       * property: {}
+       * value: Int
+       * }
+       */
+      lazy val metaMap = Map("service" -> service, "action" -> action, "item" 
-> item)
+      counterModel.findByServiceAction(service, action).map { policy =>
+        val body = request.body
+        try {
+          val ts = (body \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString
+          val dimension = (body \ 
"dimension").asOpt[JsValue].getOrElse(Json.obj())
+          val property = (body \ 
"property").asOpt[JsValue].getOrElse(Json.obj())
+
+          val msg = List(ts, service, action, item, dimension, 
property).mkString("\t")
+
+          // produce to kafka
+          // hash partitioner by key
+          ExceptionHandler.enqueue(KafkaMessage(new 
Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg)))
+
+          Ok(Json.toJson(
+            Map(
+              "meta" -> metaMap
+            )
+          ))
+        }
+        catch {
+          case e: JsResultException =>
+            BadRequest(Json.toJson(
+              Map("msg" -> s"need timestamp.")
+            ))
+        }
+      }.getOrElse {
+        NotFound(Json.toJson(
+          Map("msg" -> s"$service.$action not found")
+        ))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
new file mode 100644
index 0000000..6c83743
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -0,0 +1,219 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.rest.RequestParser
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.rest.play.actors.QueueActor
+import org.apache.s2graph.rest.play.config.Config
+import play.api.libs.json._
+import play.api.mvc.{Controller, Result}
+
+import scala.collection.Seq
+import scala.concurrent.Future
+
+object EdgeController extends Controller {
+
+  import ApplicationController._
+  import ExceptionHandler._
+  import play.api.libs.concurrent.Execution.Implicits._
+
+  private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
+  private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
+  private def jsToStr(js: JsValue): String = js match {
+    case JsString(s) => s
+    case _ => js.toString()
+  }
+
+  def toTsv(jsValue: JsValue, op: String): String = {
+    val ts = jsToStr(jsValue \ "timestamp")
+    val from = jsToStr(jsValue \ "from")
+    val to = jsToStr(jsValue \ "to")
+    val label = jsToStr(jsValue \ "label")
+    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
+
+    (jsValue \ "direction").asOpt[String] match {
+      case None => Seq(ts, op, "e", from, to, label, props).mkString("\t")
+      case Some(dir) => Seq(ts, op, "e", from, to, label, props, 
dir).mkString("\t")
+    }
+  }
+
+  def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = 
false): Future[Result] = {
+    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+
+    else {
+      try {
+        logger.debug(s"$jsValue")
+        val (edges, jsOrgs) = requestParser.toEdgesWithOrg(jsValue, operation)
+
+        for ((edge, orgJs) <- edges.zip(jsOrgs)) {
+          if (edge.isAsync)
+            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, 
Option(toTsv(orgJs, operation))))
+          else
+            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
edge, Option(toTsv(orgJs, operation))))
+        }
+
+        val edgesToStore = edges.filterNot(e => e.isAsync)
+
+        if (withWait) {
+          val rets = s2.mutateEdges(edgesToStore, withWait = true)
+          rets.map(Json.toJson(_)).map(jsonResponse(_))
+        } else {
+          val rets = edgesToStore.map { edge => QueueActor.router ! edge; true 
}
+          Future.successful(jsonResponse(Json.toJson(rets)))
+        }
+      } catch {
+        case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
+        case e: Exception =>
+          logger.error(s"mutateAndPublish: $e", e)
+          Future.successful(InternalServerError(s"${e.getStackTrace}"))
+      }
+    }
+  }
+
+  def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] 
= {
+    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+
+    logger.debug(s"$str")
+    val edgeStrs = str.split("\\n")
+
+    var vertexCnt = 0L
+    var edgeCnt = 0L
+    try {
+      val elements =
+        for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); 
element <- Graph.toGraphElement(str)) yield {
+          element match {
+            case v: Vertex => vertexCnt += 1
+            case e: Edge => edgeCnt += 1
+          }
+          if (element.isAsync) {
+            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, 
Some(str)))
+          } else {
+            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
element, Some(str)))
+          }
+          element
+        }
+
+      //FIXME:
+      val elementsToStore = elements.filterNot(e => e.isAsync)
+      if (withWait) {
+        val rets = s2.mutateElements(elementsToStore, withWait)
+        rets.map(Json.toJson(_)).map(jsonResponse(_))
+      } else {
+        val rets = elementsToStore.map { element => QueueActor.router ! 
element; true }
+        Future.successful(jsonResponse(Json.toJson(rets)))
+      }
+    } catch {
+      case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
+      case e: Throwable =>
+        logger.error(s"mutateAndPublish: $e", e)
+        Future.successful(InternalServerError(s"${e.getStackTrace}"))
+    }
+  }
+
+  def mutateBulk() = withHeaderAsync(parse.text) { request =>
+    mutateAndPublish(request.body, withWait = false)
+  }
+
+  def mutateBulkWithWait() = withHeaderAsync(parse.text) { request =>
+    mutateAndPublish(request.body, withWait = true)
+  }
+
+  def inserts() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert")
+  }
+
+  def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert", withWait = true)
+  }
+
+  def insertsBulk() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insertBulk")
+  }
+
+  def deletes() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete")
+  }
+
+  def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete", withWait = true)
+  }
+
+  def updates() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "update")
+  }
+
+  def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "update", withWait = true)
+  }
+
+  def increments() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "increment")
+  }
+
+  def incrementsWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "increment", withWait = true)
+  }
+
+  def incrementCounts() = withHeaderAsync(jsonParser) { request =>
+    val jsValue = request.body
+    val edges = requestParser.toEdges(jsValue, "incrementCount")
+
+    s2.incrementCounts(edges, withWait = true).map { results =>
+      val json = results.map { case (isSuccess, resultCount) =>
+        Json.obj("success" -> isSuccess, "result" -> resultCount)
+      }
+
+      jsonResponse(Json.toJson(json))
+    }
+  }
+
+  def deleteAll() = withHeaderAsync(jsonParser) { request =>
+//    deleteAllInner(request.body, withWait = false)
+    deleteAllInner(request.body, withWait = true)
+  }
+
+  def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
+
+    /** logging for delete all request */
+    def enqueueLogMessage(ids: Seq[JsValue], labels: Seq[Label], ts: Long, 
direction: String, topicOpt: Option[String]) = {
+      val kafkaMessages = for {
+        id <- ids
+        label <- labels
+      } yield {
+        val tsv = Seq(ts, "deleteAll", "e", jsToStr(id), jsToStr(id), 
label.label, "{}", direction).mkString("\t")
+        val topic = topicOpt.getOrElse {
+          if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else 
Config.KAFKA_LOG_TOPIC
+        }
+
+        val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, 
tsv))
+        kafkaMsg
+      }
+
+      ExceptionHandler.enqueues(kafkaMessages)
+    }
+
+    def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue], 
ts: Long, vertices: Seq[Vertex]) = {
+      enqueueLogMessage(ids, labels, ts, direction, None)
+      val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
+      if (withWait) {
+        future
+      } else {
+        Future.successful(true)
+      }
+    }
+
+    val deleteFutures = jsValue.as[Seq[JsValue]].map { json =>
+      val (labels, direction, ids, ts, vertices) = 
requestParser.toDeleteParam(json)
+      if (labels.isEmpty || ids.isEmpty) Future.successful(true)
+      else deleteEach(labels, direction, ids, ts, vertices)
+    }
+
+    val deleteResults = Future.sequence(deleteFutures)
+    deleteResults.map { rst =>
+      logger.debug(s"deleteAllInner: $rst")
+      Ok(s"deleted... ${rst.toString()}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
new file mode 100644
index 0000000..b489a81
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/ExperimentController.scala
@@ -0,0 +1,23 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.s2graph.core.mysqls.Experiment
+import org.apache.s2graph.core.rest.RestHandler
+import play.api.mvc._
+
+import scala.concurrent.ExecutionContext.Implicits.global
+
+object ExperimentController extends Controller {
+  private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest
+
+  import ApplicationController._
+
+  def experiment(accessToken: String, experimentName: String, uuid: String) = 
withHeaderAsync(jsonText) { request =>
+    val body = request.body
+
+    val res = rest.doPost(request.uri, body, 
request.headers.get(Experiment.impressionKey))
+    res.body.map { case js =>
+      val headers = res.headers :+ ("result_size" -> 
rest.calcSize(js).toString)
+      jsonResponse(js, headers: _*)
+    } recoverWith ApplicationController.requestFallback(body)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala
new file mode 100644
index 0000000..42710e1
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/JsonBodyParser.scala
@@ -0,0 +1,86 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.s2graph.core.utils.logger
+import play.api.Play
+import play.api.libs.iteratee.Iteratee
+import play.api.libs.json.{JsValue, Json}
+import play.api.mvc._
+
+import scala.concurrent.Future
+import scala.util.control.NonFatal
+
+object s2parse extends BodyParsers {
+
+  import parse._
+
+  val defaultMaxTextLength = 1024 * 512
+  val defaultMaxJsonLength = 1024 * 512
+
+  def json: BodyParser[JsValue] = json(defaultMaxTextLength)
+
+  /**
+    * parseText with application/json header for Pre-Process text
+    */
+  def jsonText: BodyParser[String] = when(
+    _.contentType.exists(m => m.equalsIgnoreCase("text/json") || 
m.equalsIgnoreCase("application/json")),
+    jsonText(defaultMaxTextLength),
+    createBadResult("Expecting text/json or application/json body")
+  )
+
+  private def jsonText(maxLength: Int): BodyParser[String] = BodyParser("json, 
maxLength=" + maxLength) { request =>
+    import play.api.libs.iteratee.Execution.Implicits.trampoline
+    import play.api.libs.iteratee.Traversable
+
+    Traversable.takeUpTo[Array[Byte]](maxLength)
+      .transform(Iteratee.consume[Array[Byte]]().map(c => new String(c, 
"UTF-8")))
+      .flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
+  }
+
+  def json(maxLength: Int): BodyParser[JsValue] = when(
+    _.contentType.exists(m => m.equalsIgnoreCase("text/json") || 
m.equalsIgnoreCase("application/json")),
+    tolerantJson(maxLength),
+    createBadResult("Expecting text/json or application/json body")
+  )
+
+  def tolerantJson(maxLength: Int): BodyParser[JsValue] =
+    tolerantBodyParser[JsValue]("json", maxLength, "Invalid Json") { (request, 
bytes) =>
+      // Encoding notes: RFC 4627 requires that JSON be encoded in Unicode, 
and states that whether that's
+      // UTF-8, UTF-16 or UTF-32 can be auto detected by reading the first two 
bytes. So we ignore the declared
+      // charset and don't decode, we passing the byte array as is because 
Jackson supports auto detection.
+      Json.parse(bytes)
+    }
+
+  private def tolerantBodyParser[A](name: String, maxLength: Int, 
errorMessage: String)(parser: (RequestHeader, Array[Byte]) => A): BodyParser[A] 
=
+    BodyParser(name + ", maxLength=" + maxLength) { request =>
+      import play.api.libs.iteratee.Execution.Implicits.trampoline
+      import play.api.libs.iteratee.Traversable
+
+      import scala.util.control.Exception._
+
+      val bodyParser: Iteratee[Array[Byte], Either[Result, 
Either[Future[Result], A]]] =
+        Traversable.takeUpTo[Array[Byte]](maxLength).transform(
+          Iteratee.consume[Array[Byte]]().map { bytes =>
+            allCatch[A].either {
+              parser(request, bytes)
+            }.left.map {
+              case NonFatal(e) =>
+                val txt = new String(bytes)
+                logger.error(s"$errorMessage: $txt", e)
+                createBadResult(s"$errorMessage: $e")(request)
+              case t => throw t
+            }
+          }
+        ).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
+
+      bodyParser.mapM {
+        case Left(tooLarge) => Future.successful(Left(tooLarge))
+        case Right(Left(badResult)) => badResult.map(Left.apply)
+        case Right(Right(body)) => Future.successful(Right(body))
+      }
+    }
+
+  private def createBadResult(msg: String): RequestHeader => Future[Result] = 
{ request =>
+    Play.maybeApplication.map(_.global.onBadRequest(request, msg))
+      .getOrElse(Future.successful(Results.BadRequest))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
new file mode 100644
index 0000000..fe478e6
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/PublishController.scala
@@ -0,0 +1,54 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.s2graph.core.ExceptionHandler
+import org.apache.s2graph.rest.play.config.Config
+import play.api.mvc._
+
+import scala.concurrent.Future
+
+object PublishController extends Controller {
+
+  import ApplicationController._
+  import ExceptionHandler._
+  import play.api.libs.concurrent.Execution.Implicits._
+
+  /**
+   * never check validation on string. just redirect strings to kafka.
+   */
+  val serviceNotExistException = new RuntimeException(s"service is not created 
in s2graph. create service first.")
+
+  //  private def toService(topic: String): String = {
+  //    Service.findByName(topic).map(service => 
s"${service.serviceName}-${Config.PHASE}").getOrElse(throw 
serviceNotExistException)
+  //  }
+  def publishOnly(topic: String) = withHeaderAsync(parse.text) { request =>
+    if (!Config.IS_WRITE_SERVER) Future.successful(UNAUTHORIZED)
+    //  val kafkaTopic = toService(topic)
+    val strs = request.body.split("\n")
+    strs.foreach(str => {
+      val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, 
str)
+      //    val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, 
s"$str")
+      //        logger.debug(s"$kafkaTopic, $str")
+      ExceptionHandler.enqueue(KafkaMessage(keyedMessage))
+    })
+    Future.successful(
+      Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", 
"Keep-Alive" -> "timeout=10, max=10")
+    )
+    //    try {
+    //
+    //    } catch {
+    //      case e: Exception => Future.successful(BadRequest(e.getMessage))
+    //    }
+  }
+
+  def publish(topic: String) = publishOnly(topic)
+
+  //  def mutateBulk(topic: String) = Action.async(parse.text) { request =>
+  //    EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, 
Config.KAFKA_FAIL_TOPIC, request.body).map { result =>
+  //      result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> 
"timeout=10, max=10")
+  //    }
+  //  }
+  def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
+    EdgeController.mutateAndPublish(request.body)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
new file mode 100644
index 0000000..e509554
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/QueryController.scala
@@ -0,0 +1,52 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.s2graph.core.JSONParser
+import org.apache.s2graph.core.mysqls.Experiment
+import org.apache.s2graph.core.rest.RestHandler
+import play.api.libs.json.Json
+import play.api.mvc._
+
+import scala.language.postfixOps
+
+object QueryController extends Controller with JSONParser {
+
+  import ApplicationController._
+  import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+  private val rest: RestHandler = org.apache.s2graph.rest.play.Global.s2rest
+
+  def delegate(request: Request[String]) = {
+    rest.doPost(request.uri, request.body, 
request.headers.get(Experiment.impressionKey)).body.map {
+      js =>
+        jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+    } recoverWith ApplicationController.requestFallback(request.body)
+  }
+
+  def getEdges() = withHeaderAsync(jsonText)(delegate)
+
+  def getEdgesWithGrouping() = withHeaderAsync(jsonText)(delegate)
+
+  def getEdgesExcluded() = withHeaderAsync(jsonText)(delegate)
+
+  def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonText)(delegate)
+
+  def checkEdges() = withHeaderAsync(jsonText)(delegate)
+
+  def getEdgesGrouped() = withHeaderAsync(jsonText)(delegate)
+
+  def getEdgesGroupedExcluded() = withHeaderAsync(jsonText)(delegate)
+
+  def getEdgesGroupedExcludedFormatted() = withHeaderAsync(jsonText)(delegate)
+
+  def getEdge(srcId: String, tgtId: String, labelName: String, direction: 
String) =
+    withHeaderAsync(jsonText) {
+      request =>
+        val params = Json.arr(Json.obj("label" -> labelName, "direction" -> 
direction, "from" -> srcId, "to" -> tgtId))
+        rest.checkEdges(params).body.map {
+          js =>
+            jsonResponse(js, "result_size" -> rest.calcSize(js).toString)
+        } recoverWith ApplicationController.requestFallback(request.body)
+    }
+
+  def getVertices() = withHeaderAsync(jsonText)(delegate)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
new file mode 100644
index 0000000..b20fa37
--- /dev/null
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
@@ -0,0 +1,85 @@
+package org.apache.s2graph.rest.play.controllers
+
+import org.apache.s2graph.core.rest.RequestParser
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphExceptions}
+import org.apache.s2graph.rest.play.actors.QueueActor
+import org.apache.s2graph.rest.play.config.Config
+import play.api.libs.json.{JsValue, Json}
+import play.api.mvc.{Controller, Result}
+
+import scala.concurrent.Future
+
+object VertexController extends Controller {
+  private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
+  private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
+
+  import ApplicationController._
+  import ExceptionHandler._
+  import play.api.libs.concurrent.Execution.Implicits._
+
+  def tryMutates(jsValue: JsValue, operation: String, serviceNameOpt: 
Option[String] = None, columnNameOpt: Option[String] = None, withWait: Boolean 
= false): Future[Result] = {
+    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+    else {
+      try {
+        val vertices = requestParser.toVertices(jsValue, operation, 
serviceNameOpt, columnNameOpt)
+
+        for (vertex <- vertices) {
+          if (vertex.isAsync)
+            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, 
None))
+          else
+            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
vertex, None))
+        }
+
+        //FIXME:
+        val verticesToStore = vertices.filterNot(v => v.isAsync)
+
+        if (withWait) {
+          val rets = s2.mutateVertices(verticesToStore, withWait = true)
+          rets.map(Json.toJson(_)).map(jsonResponse(_))
+        } else {
+          val rets = verticesToStore.map { vertex => QueueActor.router ! 
vertex; true }
+          Future.successful(jsonResponse(Json.toJson(rets)))
+        }
+      } catch {
+        case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"e"))
+        case e: Exception =>
+          logger.error(s"[Failed] tryMutates", e)
+          Future.successful(InternalServerError(s"${e.getStackTrace}"))
+      }
+    }
+  }
+
+  def inserts() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert")
+  }
+
+  def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert", withWait = true)
+  }
+
+  def insertsSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert", Some(serviceName), Some(columnName))
+  }
+
+  def deletes() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete")
+  }
+
+  def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete", withWait = true)
+  }
+
+  def deletesSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete", Some(serviceName), Some(columnName))
+  }
+
+  def deletesAll() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "deleteAll")
+  }
+
+  def deletesAllSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "deleteAll", Some(serviceName), Some(columnName))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala
new file mode 100644
index 0000000..df10aab
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/models/ExactCounterItem.scala
@@ -0,0 +1,38 @@
+package org.apache.s2graph.rest.play.models
+
+import play.api.libs.json.{Json, Writes}
+
+/**
+ * Created by alec on 15. 4. 15..
+ */
+case class ExactCounterItem(ts: Long, count: Long, score: Double)
+
+case class ExactCounterIntervalItem(interval: String, dimension: Map[String, 
String], counter: Seq[ExactCounterItem])
+
+case class ExactCounterResultMeta(service: String, action: String, item: 
String)
+
+case class ExactCounterResult(meta: ExactCounterResultMeta, data: 
Seq[ExactCounterIntervalItem])
+
+object ExactCounterItem {
+  implicit val writes = new Writes[ExactCounterItem] {
+    def writes(item: ExactCounterItem) = Json.obj(
+      "ts" -> item.ts,
+      "time" -> tsFormat.format(item.ts),
+      "count" -> item.count,
+      "score" -> item.score
+    )
+  }
+  implicit val reads = Json.reads[ExactCounterItem]
+}
+
+object ExactCounterIntervalItem {
+  implicit val format = Json.format[ExactCounterIntervalItem]
+}
+
+object ExactCounterResultMeta {
+  implicit val format = Json.format[ExactCounterResultMeta]
+}
+
+object ExactCounterResult {
+  implicit val formats = Json.format[ExactCounterResult]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala
new file mode 100644
index 0000000..3d9ef41
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/models/RankCounterItem.scala
@@ -0,0 +1,40 @@
+package org.apache.s2graph.rest.play.models
+
+import play.api.libs.json.{Json, Writes}
+
+/**
+ * Created by alec on 15. 4. 15..
+ */
+case class RankCounterItem(rank: Int, id: String, score: Double)
+
+case class RankCounterDimensionItem(interval: String, ts: Long, dimension: 
String, total: Double, ranks: Seq[RankCounterItem])
+
+case class RankCounterResultMeta(service: String, action: String)
+
+case class RankCounterResult(meta: RankCounterResultMeta, data: 
Seq[RankCounterDimensionItem])
+
+object RankCounterItem {
+  implicit val format = Json.format[RankCounterItem]
+}
+
+object RankCounterDimensionItem {
+  implicit val writes = new Writes[RankCounterDimensionItem] {
+    def writes(item: RankCounterDimensionItem) = Json.obj(
+      "interval" -> item.interval,
+      "ts" -> item.ts,
+      "time" -> tsFormat.format(item.ts),
+      "dimension" -> item.dimension,
+      "total" -> item.total,
+      "ranks" -> item.ranks
+    )
+  }
+  implicit val reads = Json.reads[RankCounterDimensionItem]
+}
+
+object RankCounterResultMeta {
+  implicit val format = Json.format[RankCounterResultMeta]
+}
+
+object RankCounterResult {
+  implicit val format = Json.format[RankCounterResult]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala
new file mode 100644
index 0000000..08b0d45
--- /dev/null
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/models/package.scala
@@ -0,0 +1,10 @@
+package org.apache.s2graph.rest.play
+
+import java.text.SimpleDateFormat
+
+/**
+ * Created by alec on 15. 4. 20..
+ */
+package object models {
+  def tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/app/util/TestDataLoader.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/util/TestDataLoader.scala 
b/s2rest_play/app/util/TestDataLoader.scala
deleted file mode 100644
index 45a9b61..0000000
--- a/s2rest_play/app/util/TestDataLoader.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-package util
-
-import java.io.File
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer}
-import scala.io.Source
-import scala.util.Random
-
-
-object TestDataLoader {
-  val step = 100
-  val prob = 1.0
-  val (testIds, testIdsHist, testIdsHistCnt) = loadSeeds("./talk_vertices.txt")
-  val maxId = testIds.length
-  //  val randoms = (0 until 100).map{ i => new SecureRandom }
-  //  val idx = new AtomicInteger(0)
-  //  def randomId() = {
-  //    val r = randoms(idx.getAndIncrement() % randoms.size)
-  //    testAccountIds(r.nextInt(maxId))
-  //  }
-  def randomId(histStep: Int) = {
-    for {
-      maxId <- testIdsHistCnt.get(histStep)
-      rIdx = Random.nextInt(maxId.toInt)
-      hist <- testIdsHist.get(histStep)
-      id = hist(rIdx)
-    } yield {
-//      logger.debug(s"randomId: $histStep = $id[$rIdx / $maxId]")
-      id
-    }
-  }
-  def randomId() = {
-    val id = testIds(Random.nextInt(maxId))
-    //    logger.debug(s"$id")
-    id
-  }
-  private def loadSeeds(filePath: String) = {
-    val histogram = new HashMap[Long, ListBuffer[Long]]
-    val histogramCnt = new HashMap[Long, Long]
-    val ids = new ArrayBuffer[Long]
-
-    var idx = 0
-//    logger.debug(s"$filePath start to load file.")
-    for (line <- Source.fromFile(new File(filePath)).getLines) {
-      //      testAccountIds(idx) = line.toLong
-//      if (idx % 10000 == 0) logger.debug(s"$idx")
-      idx += 1
-
-      val parts = line.split("\\t")
-      val id = parts.head.toLong
-      val count = parts.last.toLong / step
-      if (count > 1 && Random.nextDouble < prob) {
-        histogram.get(count) match {
-          case None =>
-            histogram.put(count, new ListBuffer[Long])
-            histogram.get(count).get += id
-            histogramCnt.put(count, 1)
-          case Some(existed) =>
-            existed += id
-            histogramCnt.put(count, histogramCnt.getOrElse(count, 0L) + 1L)
-        }
-        ids += id
-      }
-
-    }
-//    logger.debug(s"upload $filePath finished.")
-//    logger.debug(s"${histogram.size}")
-    (ids, histogram.map(t => (t._1 -> t._2.toArray[Long])), histogramCnt)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/conf/reference.conf
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/reference.conf b/s2rest_play/conf/reference.conf
index 6e76847..452c013 100644
--- a/s2rest_play/conf/reference.conf
+++ b/s2rest_play/conf/reference.conf
@@ -112,7 +112,7 @@ lock.expire.time=600000
 # max allowd edges for deleteAll is multiply of above two configuration.
 
 # set global obejct package, TODO: remove global
-application.global=com.kakao.s2graph.rest.Global
+application.global=org.apache.s2graph.rest.play.Global
 
 akka {
   loggers = ["akka.event.slf4j.Slf4jLogger"]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/conf/routes
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/routes b/s2rest_play/conf/routes
index 90838c8..f0115c9 100644
--- a/s2rest_play/conf/routes
+++ b/s2rest_play/conf/routes
@@ -4,123 +4,104 @@
 
 
 # publish
-#POST   /publish/:topic                                                        
controllers.PublishController.publish(topic)
-POST        /publish/:topic                                               
controllers.PublishController.mutateBulk(topic)
-POST        /publishOnly/:topic                                           
controllers.PublishController.publishOnly(topic)
+POST        /publish/:topic                                               
org.apache.s2graph.rest.play.controllers.PublishController.mutateBulk(topic)
+POST        /publishOnly/:topic                                           
org.apache.s2graph.rest.play.controllers.PublishController.publishOnly(topic)
 
 #### Health Check
-#GET    /health_check.html                                                     
controllers.Assets.at(path="/public", file="health_check.html")
-GET         /health_check.html                                            
controllers.ApplicationController.healthCheck()
-PUT         /health_check/:isHealthy                                      
controllers.ApplicationController.updateHealthCheck(isHealthy: Boolean)
+GET         /health_check.html                                            
org.apache.s2graph.rest.play.controllers.ApplicationController.healthCheck()
+PUT         /health_check/:isHealthy                                      
org.apache.s2graph.rest.play.controllers.ApplicationController.updateHealthCheck(isHealthy:
 Boolean)
 
 ## Edge
-POST        /graphs/edges/insert                                          
controllers.EdgeController.inserts()
-POST        /graphs/edges/insertWithWait                                  
controllers.EdgeController.insertsWithWait()
-POST        /graphs/edges/insertBulk                                      
controllers.EdgeController.insertsBulk()
-POST        /graphs/edges/delete                                          
controllers.EdgeController.deletes()
-POST        /graphs/edges/deleteWithWait                                  
controllers.EdgeController.deletesWithWait()
-POST        /graphs/edges/deleteAll                                       
controllers.EdgeController.deleteAll()
-POST        /graphs/edges/update                                          
controllers.EdgeController.updates()
-POST        /graphs/edges/updateWithWait                                  
controllers.EdgeController.updatesWithWait()
-POST        /graphs/edges/increment                                       
controllers.EdgeController.increments()
-POST        /graphs/edges/incrementWithWait                               
controllers.EdgeController.incrementsWithWait()
-POST        /graphs/edges/incrementCount                                  
controllers.EdgeController.incrementCounts()
-POST        /graphs/edges/bulk                                            
controllers.EdgeController.mutateBulk()
-POST        /graphs/edges/bulkWithWait                                    
controllers.EdgeController.mutateBulkWithWait()
+POST        /graphs/edges/insert                                          
org.apache.s2graph.rest.play.controllers.EdgeController.inserts()
+POST        /graphs/edges/insertWithWait                                  
org.apache.s2graph.rest.play.controllers.EdgeController.insertsWithWait()
+POST        /graphs/edges/insertBulk                                      
org.apache.s2graph.rest.play.controllers.EdgeController.insertsBulk()
+POST        /graphs/edges/delete                                          
org.apache.s2graph.rest.play.controllers.EdgeController.deletes()
+POST        /graphs/edges/deleteWithWait                                  
org.apache.s2graph.rest.play.controllers.EdgeController.deletesWithWait()
+POST        /graphs/edges/deleteAll                                       
org.apache.s2graph.rest.play.controllers.EdgeController.deleteAll()
+POST        /graphs/edges/update                                          
org.apache.s2graph.rest.play.controllers.EdgeController.updates()
+POST        /graphs/edges/updateWithWait                                  
org.apache.s2graph.rest.play.controllers.EdgeController.updatesWithWait()
+POST        /graphs/edges/increment                                       
org.apache.s2graph.rest.play.controllers.EdgeController.increments()
+POST        /graphs/edges/incrementWithWait                               
org.apache.s2graph.rest.play.controllers.EdgeController.incrementsWithWait()
+POST        /graphs/edges/incrementCount                                  
org.apache.s2graph.rest.play.controllers.EdgeController.incrementCounts()
+POST        /graphs/edges/bulk                                            
org.apache.s2graph.rest.play.controllers.EdgeController.mutateBulk()
+POST        /graphs/edges/bulkWithWait                                    
org.apache.s2graph.rest.play.controllers.EdgeController.mutateBulkWithWait()
 
 ## Vertex
-POST        /graphs/vertices/insert                                       
controllers.VertexController.inserts()
-POST        /graphs/vertices/insertWithWait                               
controllers.VertexController.insertsWithWait()
-POST        /graphs/vertices/insert/:serviceName/:columnName              
controllers.VertexController.insertsSimple(serviceName, columnName)
-POST        /graphs/vertices/delete                                       
controllers.VertexController.deletes()
-POST        /graphs/vertices/deleteWithWait                               
controllers.VertexController.deletesWithWait()
-POST        /graphs/vertices/delete/:serviceName/:columnName              
controllers.VertexController.deletesSimple(serviceName, columnName)
-POST        /graphs/vertices/deleteAll                                    
controllers.VertexController.deletesAll()
-POST        /graphs/vertices/deleteAll/:serviceName/:columnName           
controllers.VertexController.deletesAllSimple(serviceName, columnName)
+POST        /graphs/vertices/insert                                       
org.apache.s2graph.rest.play.controllers.VertexController.inserts()
+POST        /graphs/vertices/insertWithWait                               
org.apache.s2graph.rest.play.controllers.VertexController.insertsWithWait()
+POST        /graphs/vertices/insert/:serviceName/:columnName              
org.apache.s2graph.rest.play.controllers.VertexController.insertsSimple(serviceName,
 columnName)
+POST        /graphs/vertices/delete                                       
org.apache.s2graph.rest.play.controllers.VertexController.deletes()
+POST        /graphs/vertices/deleteWithWait                               
org.apache.s2graph.rest.play.controllers.VertexController.deletesWithWait()
+POST        /graphs/vertices/delete/:serviceName/:columnName              
org.apache.s2graph.rest.play.controllers.VertexController.deletesSimple(serviceName,
 columnName)
+POST        /graphs/vertices/deleteAll                                    
org.apache.s2graph.rest.play.controllers.VertexController.deletesAll()
+POST        /graphs/vertices/deleteAll/:serviceName/:columnName           
org.apache.s2graph.rest.play.controllers.VertexController.deletesAllSimple(serviceName,
 columnName)
 
 
 ### SELECT Edges
-POST        /graphs/getEdges                                              
controllers.QueryController.getEdges()
-POST        /graphs/getEdges/grouped                                      
controllers.QueryController.getEdgesWithGrouping()
-POST        /graphs/getEdgesExcluded                                      
controllers.QueryController.getEdgesExcluded()
-POST        /graphs/getEdgesExcluded/grouped                              
controllers.QueryController.getEdgesExcludedWithGrouping()
-POST        /graphs/checkEdges                                            
controllers.QueryController.checkEdges()
+POST        /graphs/getEdges                                              
org.apache.s2graph.rest.play.controllers.QueryController.getEdges()
+POST        /graphs/getEdges/grouped                                      
org.apache.s2graph.rest.play.controllers.QueryController.getEdgesWithGrouping()
+POST        /graphs/getEdgesExcluded                                      
org.apache.s2graph.rest.play.controllers.QueryController.getEdgesExcluded()
+POST        /graphs/getEdgesExcluded/grouped                              
org.apache.s2graph.rest.play.controllers.QueryController.getEdgesExcludedWithGrouping()
+POST        /graphs/checkEdges                                            
org.apache.s2graph.rest.play.controllers.QueryController.checkEdges()
 
 ### this will be deprecated
-POST        /graphs/getEdgesGrouped                                       
controllers.QueryController.getEdgesGrouped()
-POST        /graphs/getEdgesGroupedExcluded                               
controllers.QueryController.getEdgesGroupedExcluded()
-POST        /graphs/getEdgesGroupedExcludedFormatted                      
controllers.QueryController.getEdgesGroupedExcludedFormatted()
-GET         /graphs/getEdge/:srcId/:tgtId/:labelName/:direction           
controllers.QueryController.getEdge(srcId, tgtId, labelName, direction)
+POST        /graphs/getEdgesGrouped                                       
org.apache.s2graph.rest.play.controllers.QueryController.getEdgesGrouped()
+POST        /graphs/getEdgesGroupedExcluded                               
org.apache.s2graph.rest.play.controllers.QueryController.getEdgesGroupedExcluded()
+POST        /graphs/getEdgesGroupedExcludedFormatted                      
org.apache.s2graph.rest.play.controllers.QueryController.getEdgesGroupedExcludedFormatted()
+GET         /graphs/getEdge/:srcId/:tgtId/:labelName/:direction           
org.apache.s2graph.rest.play.controllers.QueryController.getEdge(srcId, tgtId, 
labelName, direction)
 
 
 ### SELECT Vertices
-#POST   /graphs/getVertex                                                      
controllers.QueryController.getVertex()
-POST        /graphs/getVertices                                           
controllers.QueryController.getVertices()
+POST        /graphs/getVertices                                           
org.apache.s2graph.rest.play.controllers.QueryController.getVertices()
 
 
 #### ADMIN
-POST        /graphs/createService                                         
controllers.AdminController.createService()
-GET         /graphs/getService/:serviceName                               
controllers.AdminController.getService(serviceName)
-GET         /graphs/getLabels/:serviceName                                
controllers.AdminController.getLabels(serviceName)
-POST        /graphs/createLabel                                           
controllers.AdminController.createLabel()
-POST        /graphs/addIndex                                              
controllers.AdminController.addIndex()
-GET         /graphs/getLabel/:labelName                                   
controllers.AdminController.getLabel(labelName)
-PUT         /graphs/deleteLabel/:labelName                                
controllers.AdminController.deleteLabel(labelName)
+POST        /graphs/createService                                         
org.apache.s2graph.rest.play.controllers.AdminController.createService()
+GET         /graphs/getService/:serviceName                               
org.apache.s2graph.rest.play.controllers.AdminController.getService(serviceName)
+GET         /graphs/getLabels/:serviceName                                
org.apache.s2graph.rest.play.controllers.AdminController.getLabels(serviceName)
+POST        /graphs/createLabel                                           
org.apache.s2graph.rest.play.controllers.AdminController.createLabel()
+POST        /graphs/addIndex                                              
org.apache.s2graph.rest.play.controllers.AdminController.addIndex()
+GET         /graphs/getLabel/:labelName                                   
org.apache.s2graph.rest.play.controllers.AdminController.getLabel(labelName)
+PUT         /graphs/deleteLabel/:labelName                                
org.apache.s2graph.rest.play.controllers.AdminController.deleteLabel(labelName)
+
+POST        /graphs/addProp/:labelName                                    
org.apache.s2graph.rest.play.controllers.AdminController.addProp(labelName)
+POST        /graphs/createServiceColumn                                   
org.apache.s2graph.rest.play.controllers.AdminController.createServiceColumn()
+PUT         /graphs/deleteServiceColumn/:serviceName/:columnName          
org.apache.s2graph.rest.play.controllers.AdminController.deleteServiceColumn(serviceName,
 columnName)
+POST        /graphs/addServiceColumnProp/:serviceName/:columnName         
org.apache.s2graph.rest.play.controllers.AdminController.addServiceColumnProp(serviceName,
 columnName)
+POST        /graphs/addServiceColumnProps/:serviceName/:columnName        
org.apache.s2graph.rest.play.controllers.AdminController.addServiceColumnProps(serviceName,
 columnName)
+GET         /graphs/getServiceColumn/:serviceName/:columnName             
org.apache.s2graph.rest.play.controllers.AdminController.getServiceColumn(serviceName,
 columnName)
+POST        /graphs/createHTable                                          
org.apache.s2graph.rest.play.controllers.AdminController.createHTable()
 
-POST        /graphs/addProp/:labelName                                    
controllers.AdminController.addProp(labelName)
-POST        /graphs/createServiceColumn                                   
controllers.AdminController.createServiceColumn()
-PUT         /graphs/deleteServiceColumn/:serviceName/:columnName          
controllers.AdminController.deleteServiceColumn(serviceName, columnName)
-POST        /graphs/addServiceColumnProp/:serviceName/:columnName         
controllers.AdminController.addServiceColumnProp(serviceName, columnName)
-POST        /graphs/addServiceColumnProps/:serviceName/:columnName        
controllers.AdminController.addServiceColumnProps(serviceName, columnName)
-GET         /graphs/getServiceColumn/:serviceName/:columnName             
controllers.AdminController.getServiceColumn(serviceName, columnName)
-POST        /graphs/createHTable                                          
controllers.AdminController.createHTable()
 
+# AdminController API
+GET         /admin/labels/:serviceName                                    
org.apache.s2graph.rest.play.controllers.AdminController.getLabels(serviceName)
+POST        /graphs/copyLabel/:oldLabelName/:newLabelName                 
org.apache.s2graph.rest.play.controllers.AdminController.copyLabel(oldLabelName,
 newLabelName)
+POST        /graphs/renameLabel/:oldLabelName/:newLabelName               
org.apache.s2graph.rest.play.controllers.AdminController.renameLabel(oldLabelName,
 newLabelName)
+POST        /graphs/updateHTable/:labelName/:newHTableName                
org.apache.s2graph.rest.play.controllers.AdminController.updateHTable(labelName,
 newHTableName)
+PUT         /graphs/loadCache                                             
org.apache.s2graph.rest.play.controllers.AdminController.loadCache()
+
+
+# Counter Admin API
+POST    /counter/v1/:service/:action                                    
org.apache.s2graph.rest.play.controllers.CounterController.createAction(service,
 action)
+GET     /counter/v1/:service/:action                                    
org.apache.s2graph.rest.play.controllers.CounterController.getAction(service, 
action)
+PUT     /counter/v1/:service/:action                                    
org.apache.s2graph.rest.play.controllers.CounterController.updateAction(service,
 action)
+PUT     /counter/v1/:service/:action/prepare                            
org.apache.s2graph.rest.play.controllers.CounterController.prepareAction(service,
 action)
+DELETE  /counter/v1/:service/:action                                    
org.apache.s2graph.rest.play.controllers.CounterController.deleteAction(service,
 action)
 
+# Counter API
+GET     /counter/v1/:service/:action/ranking                            
org.apache.s2graph.rest.play.controllers.CounterController.getRankingCountAsync(service,
 action)
+DELETE  /counter/v1/:service/:action/ranking                            
org.apache.s2graph.rest.play.controllers.CounterController.deleteRankingCount(service,
 action)
+GET     /counter/v1/:service/:action/:item                              
org.apache.s2graph.rest.play.controllers.CounterController.getExactCountAsync(service,
 action, item)
+PUT     /counter/v1/:service/:action/:item                              
org.apache.s2graph.rest.play.controllers.CounterController.incrementCount(service,
 action, item)
+POST    /counter/v1/mget                                                
org.apache.s2graph.rest.play.controllers.CounterController.getExactCountAsyncMulti()
 
+# Experiment API
+POST        /graphs/experiment/:accessToken/:experimentName/:uuid         
org.apache.s2graph.rest.play.controllers.ExperimentController.experiment(accessToken,
 experimentName, uuid)
 
-#### TEST
-#GET    /graphs/testGetEdges/:label/:limit/:friendCntStep                      
 controllers.QueryController.testGetEdges(label, limit: Int, friendCntStep: Int)
-#GET    /graphs/testGetEdges2/:label1/:limit1/:label2/:limit2                  
 controllers.QueryController.testGetEdges2(label1, limit1: Int, label2, limit2: 
Int)
-#GET    /graphs/testGetEdges3/:label1/:limit1/:label2/:limit2/:label3/:limit3  
 controllers.QueryController.testGetEdges3(label1, limit1: Int, label2, limit2: 
Int, label3, limit3: Int)
-POST        /ping                                                         
controllers.TestController.ping()
-POST        /pingAsync                                                    
controllers.TestController.pingAsync()
-GET         /graphs/testId                                                
controllers.TestController.getRandomId()
 
 # Map static resources from the /public folder to the /assets URL path
 GET         /images/*file                                                 
controllers.Assets.at(path="/public/images", file)
 GET         /javascripts/*file                                            
controllers.Assets.at(path="/public/javascripts", file)
 GET         /stylesheets/*file                                            
controllers.Assets.at(path="/public/stylesheets", file)
 GET         /font-awesome-4.1.0/*file                                     
controllers.Assets.at(path="/public/font-awesome-4.1.0", file)
-GET         /swagger/*file                                                
controllers.Assets.at(path="/public/swagger-ui", file)
-
-
-# AdminController API
-#GET    /admin/services                                                        
 controllers.AdminController.allServices
-GET         /admin/labels/:serviceName                                    
controllers.AdminController.getLabels(serviceName)
-#POST   /admin/labels/delete/:zkAddr/:tableName/:labelIds/:minTs/:maxTs        
 controllers.AdminController.deleteEdges(zkAddr, tableName, labelIds, minTs: 
Long, maxTs: Long)
-#POST   /admin/labels/deleteAll/:zkAddr/:tableName/:minTs/:maxTs               
 controllers.AdminController.deleteAllEdges(zkAddr, tableName, minTs: Long, 
maxTs: Long)
-#POST   /admin/swapLabel/:oldLabelName/:newLabelName                           
 controllers.AdminController.swapLabel(oldLabelName, newLabelName)
-#GET    /admin/reloadLabel/:labelName                                          
 controllers.AdminController.reloadLabel(labelName)
-#POST        /admin/getEdges                                                   
           controllers.AdminController.getEdges()
-POST        /graphs/copyLabel/:oldLabelName/:newLabelName                 
controllers.AdminController.copyLabel(oldLabelName, newLabelName)
-POST        /graphs/renameLabel/:oldLabelName/:newLabelName               
controllers.AdminController.renameLabel(oldLabelName, newLabelName)
-POST        /graphs/updateHTable/:labelName/:newHTableName                
controllers.AdminController.updateHTable(labelName, newHTableName)
-PUT         /graphs/loadCache                                             
controllers.AdminController.loadCache()
-
-
-# Counter Admin API
-POST    /counter/v1/:service/:action                                    
controllers.CounterController.createAction(service, action)
-GET     /counter/v1/:service/:action                                    
controllers.CounterController.getAction(service, action)
-PUT     /counter/v1/:service/:action                                    
controllers.CounterController.updateAction(service, action)
-PUT     /counter/v1/:service/:action/prepare                            
controllers.CounterController.prepareAction(service, action)
-DELETE  /counter/v1/:service/:action                                    
controllers.CounterController.deleteAction(service, action)
-
-# Counter API
-GET     /counter/v1/:service/:action/ranking                            
controllers.CounterController.getRankingCountAsync(service, action)
-DELETE  /counter/v1/:service/:action/ranking                            
controllers.CounterController.deleteRankingCount(service, action)
-GET     /counter/v1/:service/:action/:item                              
controllers.CounterController.getExactCountAsync(service, action, item)
-PUT     /counter/v1/:service/:action/:item                              
controllers.CounterController.incrementCount(service, action, item)
-POST    /counter/v1/mget                                                
controllers.CounterController.getExactCountAsyncMulti()
-
-# Experiment API
-POST        /graphs/experiment/:accessToken/:experimentName/:uuid         
controllers.ExperimentController.experiment(accessToken, experimentName, uuid)
+GET         /swagger/*file                                                
controllers.Assets.at(path="/public/swagger-ui", file)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/benchmark/BenchmarkCommon.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/benchmark/BenchmarkCommon.scala 
b/s2rest_play/test/benchmark/BenchmarkCommon.scala
deleted file mode 100644
index 48f84c8..0000000
--- a/s2rest_play/test/benchmark/BenchmarkCommon.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-package benchmark
-
-import org.specs2.mutable.Specification
-
-trait BenchmarkCommon extends Specification {
-  val wrapStr = s"\n=================================================="
-
-  def duration[T](prefix: String = "")(block: => T) = {
-    val startTs = System.currentTimeMillis()
-    val ret = block
-    val endTs = System.currentTimeMillis()
-    println(s"$wrapStr\n$prefix: took ${endTs - startTs} ms$wrapStr")
-    ret
-  }
-
-  def durationWithReturn[T](prefix: String = "")(block: => T): (T, Long) = {
-    val startTs = System.currentTimeMillis()
-    val ret = block
-    val endTs = System.currentTimeMillis()
-    val duration = endTs - startTs
-//    println(s"$wrapStr\n$prefix: took $duration ms$wrapStr")
-    (ret, duration)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2rest_play/test/benchmark/GraphUtilSpec.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/test/benchmark/GraphUtilSpec.scala 
b/s2rest_play/test/benchmark/GraphUtilSpec.scala
deleted file mode 100644
index b5ce93a..0000000
--- a/s2rest_play/test/benchmark/GraphUtilSpec.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-package benchmark
-
-import com.kakao.s2graph.core.{Management, GraphUtil}
-import com.kakao.s2graph.core.types.{SourceVertexId, HBaseType, InnerVal, 
VertexId}
-import org.apache.hadoop.hbase.util.Bytes
-import play.api.test.{FakeApplication, PlaySpecification}
-
-import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
-import scala.util.Random
-
-class GraphUtilSpec extends BenchmarkCommon with PlaySpecification {
-
-  def between(bytes: Array[Byte], startKey: Array[Byte], endKey: Array[Byte]): 
Boolean =
-    Bytes.compareTo(startKey, bytes) <= 0 && Bytes.compareTo(endKey, bytes) >= 0
-
-  def betweenShort(value: Short, start: Short, end: Short): Boolean =
-    start <= value && value <= end
-
-
-  "GraphUtil" should {
-    "test murmur3 hash function distribution" in {
-      val testNum = 1000000
-      val bucketSize = Short.MaxValue / 40
-      val countsNew = new mutable.HashMap[Int, Int]()
-      val counts = new mutable.HashMap[Int, Int]()
-      for {
-        i <- (0 until testNum)
-      } {
-        val h = GraphUtil.murmur3(i.toString) / bucketSize
-        val hNew = GraphUtil.murmur3Int(i.toString) / bucketSize
-        counts += (h -> (counts.getOrElse(h, 0) + 1))
-        countsNew += (hNew -> (countsNew.getOrElse(hNew, 0) + 1))
-      }
-      val all = counts.toList.sortBy { case (bucket, count) => count }.reverse
-      val allNew = countsNew.toList.sortBy { case (bucket, count) => count 
}.reverse
-      val top = all.take(10)
-      val bottom = all.takeRight(10)
-      val topNew = allNew.take(10)
-      val bottomNew = allNew.takeRight(10)
-      println(s"Top: $top")
-      println(s"Bottom: $bottom")
-      println("-" * 50)
-      println(s"TopNew: $topNew")
-      println(s"Bottom: $bottomNew")
-      true
-    }
-
-    "test murmur hash skew2" in {
-      running(FakeApplication()) {
-        import HBaseType._
-        val testNum = 1000000L
-        val regionCount = 40
-        val window = Int.MaxValue / regionCount
-        val rangeBytes = new ListBuffer[(List[Byte], List[Byte])]()
-        for {
-          i <- (0 until regionCount)
-        } yield {
-          val startKey =  Bytes.toBytes(i * window)
-          val endKey = Bytes.toBytes((i + 1) * window)
-          rangeBytes += (startKey.toList -> endKey.toList)
-        }
-
-
-
-        val stats = new collection.mutable.HashMap[Int, ((List[Byte], 
List[Byte]), Long)]()
-        val counts = new collection.mutable.HashMap[Short, Long]()
-        stats += (0 -> (rangeBytes.head -> 0L))
-
-        for (i <- (0L until testNum)) {
-          val vertexId = SourceVertexId(DEFAULT_COL_ID, InnerVal.withLong(i, 
HBaseType.DEFAULT_VERSION))
-          val bytes = vertexId.bytes
-          val shortKey = GraphUtil.murmur3(vertexId.innerId.toIdString())
-          val shortVal = counts.getOrElse(shortKey, 0L) + 1L
-          counts += (shortKey -> shortVal)
-          var j = 0
-          var found = false
-          while (j < rangeBytes.size && !found) {
-            val (start, end) = rangeBytes(j)
-            if (between(bytes, start.toArray, end.toArray)) {
-              found = true
-            }
-            j += 1
-          }
-          val head = rangeBytes(j - 1)
-          val key = j - 1
-          val value = stats.get(key) match {
-            case None => 0L
-            case Some(v) => v._2 + 1
-          }
-          stats += (key -> (head, value))
-        }
-        val sorted = stats.toList.sortBy(kv => kv._2._2).reverse
-        println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ 
EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount")
-        sorted.foreach { case (idx, ((start, end), cnt)) =>
-          val startShort = Bytes.toShort(start.take(2).toArray)
-          val endShort = Bytes.toShort(end.take(2).toArray)
-          val count = counts.count(t => startShort <= t._1 && t._1 < endShort)
-          println(s"$idx: $start ~ $end\t${start.take(2)} ~ 
${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count")
-
-        }
-        println("\n" * 10)
-        println(s"Index: StartBytes ~ EndBytes\tStartShortBytes ~ 
EndShortBytes\tStartShort ~ EndShort\tCount\tShortCount")
-        stats.toList.sortBy(kv => kv._1).reverse.foreach { case (idx, ((start, 
end), cnt)) =>
-          val startShort = Bytes.toShort(start.take(2).toArray)
-          val endShort = Bytes.toShort(end.take(2).toArray)
-          val count = counts.count(t => startShort <= t._1 && t._1 < endShort)
-          println(s"$idx: $start ~ $end\t${start.take(2)} ~ 
${end.take(2)}\t$startShort ~ $endShort\t$cnt\t$count")
-
-        }
-      }
-      true
-    }
-
-    "Bytes compareTo" in {
-      val x = Array[Byte](11, -12, -26, -14, -23)
-      val startKey = Array[Byte](0, 0, 0, 0)
-      val endKey = Array[Byte](12, -52, -52, -52)
-      println(Bytes.compareTo(startKey, x))
-      println(Bytes.compareTo(endKey, x))
-      true
-    }
-  }
-
-}


Reply via email to