http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/AdminController.scala 
b/s2rest_play/app/controllers/AdminController.scala
new file mode 100644
index 0000000..2eb248b
--- /dev/null
+++ b/s2rest_play/app/controllers/AdminController.scala
@@ -0,0 +1,419 @@
+package controllers
+
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.utils.logger
+import play.api.mvc
+import play.api.mvc.{Action, Controller}
+import play.api.libs.json._
+import play.api.libs.functional.syntax._
+
+import scala.util.{Failure, Success, Try}
+
+object AdminController extends Controller {
+
+  import ApplicationController._
+  private val requestParser: RequestParser = 
com.kakao.s2graph.rest.Global.s2parser
+
+  /**
+   * admin message formatter
+   * @tparam T
+   */
+  trait AdminMessageFormatter[T] {
+    def toJson(msg: T): JsValue
+  }
+
+  object AdminMessageFormatter {
+    implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] {
+      def toJson(js: T) = js
+    }
+
+    implicit val stringToJson = new AdminMessageFormatter[String] {
+      def toJson(js: String) = Json.obj("message" -> js)
+    }
+  }
+
+  def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: 
T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    f(formatter.toJson(message))
+  }
+
+  /**
+   * ok response
+   * @param message
+   * @tparam T
+   * @return
+   */
+  def ok[T: AdminMessageFormatter](message: T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    Ok(formatter.toJson(message)).as(applicationJsonHeader)
+  }
+
+  /**
+   * bad request response
+   * @param message
+   * @tparam T
+   * @return
+   */
+  def bad[T: AdminMessageFormatter](message: T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    BadRequest(formatter.toJson(message)).as(applicationJsonHeader)
+  }
+
+  /**
+   * not found response
+   * @param message
+   * @tparam T
+   * @return
+   */
+  def notFound[T: AdminMessageFormatter](message: T) = {
+    val formatter = implicitly[AdminMessageFormatter[T]]
+    NotFound(formatter.toJson(message)).as(applicationJsonHeader)
+  }
+
+  private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: 
Try[T])(callback: T => R): mvc.Result = res match {
+    case Success(m) =>
+      val ret = callback(m)
+      logger.info(ret.toString)
+      ok(ret)
+    case Failure(error) =>
+      logger.error(error.getMessage, error)
+      error match {
+        case JsResultException(e) => bad(JsError.toFlatJson(e))
+        case _ => bad(error.getMessage)
+      }
+  }
+
+  def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T 
=> R): mvc.Result = res match {
+    case Some(m) => ok(callback(m))
+    case None => notFound("not found")
+  }
+
+  /**
+   * load all model cache
+   * @return
+   */
+  def loadCache() = Action { request =>
+    val startTs = System.currentTimeMillis()
+
+    if (!ApplicationController.isHealthy) {
+      loadCacheInner()
+    }
+
+    ok(s"${System.currentTimeMillis() - startTs}")
+  }
+
+  def loadCacheInner() = {
+    Service.findAll()
+    ServiceColumn.findAll()
+    Label.findAll()
+    LabelMeta.findAll()
+    LabelIndex.findAll()
+    ColumnMeta.findAll()
+  }
+
+  /**
+   * read
+   */
+
+  /**
+   * get service info
+   * @param serviceName
+   * @return
+   */
+  def getService(serviceName: String) = Action { request =>
+    val serviceOpt = Management.findService(serviceName)
+    optionResponse(serviceOpt)(_.toJson)
+  }
+
+  /**
+   * get label info
+   * @param labelName
+   * @return
+   */
+  def getLabel(labelName: String) = Action { request =>
+    val labelOpt = Management.findLabel(labelName)
+    optionResponse(labelOpt)(_.toJson)
+  }
+
+  /**
+   * get all labels of service
+   * @param serviceName
+   * @return
+   */
+  def getLabels(serviceName: String) = Action { request =>
+    Service.findByName(serviceName) match {
+      case None => notFound(s"Service $serviceName not found")
+      case Some(service) =>
+        val src = Label.findBySrcServiceId(service.id.get)
+        val tgt = Label.findByTgtServiceId(service.id.get)
+
+        ok(Json.obj("from" -> src.map(_.toJson), "to" -> tgt.map(_.toJson)))
+    }
+  }
+
+  /**
+   * get service columns
+   * @param serviceName
+   * @param columnName
+   * @return
+   */
+  def getServiceColumn(serviceName: String, columnName: String) = Action { 
request =>
+    val serviceColumnOpt = for {
+      service <- Service.findByName(serviceName)
+      serviceColumn <- ServiceColumn.find(service.id.get, columnName, useCache 
= false)
+    } yield serviceColumn
+
+    optionResponse(serviceColumnOpt)(_.toJson)
+  }
+
+  /**
+   * create
+   */
+
+  /**
+   * create service
+   * @return
+   */
+  def createService() = Action(parse.json) { request =>
+    val serviceTry = createServiceInner(request.body)
+    tryResponse(serviceTry)(_.toJson)
+  }
+
+  def createServiceInner(jsValue: JsValue) = {
+    val (serviceName, cluster, tableName, preSplitSize, ttl, 
compressionAlgorithm) = requestParser.toServiceElements(jsValue)
+    Management.createService(serviceName, cluster, tableName, preSplitSize, 
ttl, compressionAlgorithm)
+  }
+
+  /**
+   * create label
+   * @return
+   */
+  def createLabel() = Action(parse.json) { request =>
+    val ret = createLabelInner(request.body)
+    tryResponse(ret)(_.toJson)
+  }
+
+  def createLabelInner(json: JsValue) = for {
+    labelArgs <- requestParser.toLabelElements(json)
+    label <- (Management.createLabel _).tupled(labelArgs)
+  } yield label
+
+  /**
+   * add index
+   * @return
+   */
+  def addIndex() = Action(parse.json) { request =>
+    val ret = addIndexInner(request.body)
+    tryResponse(ret)(_.label + " is updated")
+  }
+
+  def addIndexInner(json: JsValue) = for {
+    (labelName, indices) <- requestParser.toIndexElements(json)
+    label <- Management.addIndex(labelName, indices)
+  } yield label
+
+  /**
+   * create service column
+   * @return
+   */
+  def createServiceColumn() = Action(parse.json) { request =>
+    val serviceColumnTry = createServiceColumnInner(request.body)
+    tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => 
Json.obj("metas" -> columns.map(_.toJson)) }
+  }
+
+  def createServiceColumnInner(jsValue: JsValue) = for {
+    (serviceName, columnName, columnType, props) <- 
requestParser.toServiceColumnElements(jsValue)
+    serviceColumn <- Management.createServiceColumn(serviceName, columnName, 
columnType, props)
+  } yield serviceColumn
+
+  /**
+   * delete
+   */
+
+  /**
+   * delete label
+   * @param labelName
+   * @return
+   */
+  def deleteLabel(labelName: String) = Action { request =>
+    val deleteLabelTry = deleteLabelInner(labelName)
+    tryResponse(deleteLabelTry)(labelName => labelName + " is deleted")
+  }
+
+  def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName)
+
+  /**
+   * delete servieColumn
+   * @param serviceName
+   * @param columnName
+   * @return
+   */
+  def deleteServiceColumn(serviceName: String, columnName: String) = Action { 
request =>
+    val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName)
+    tryResponse(serviceColumnTry)(columnName => columnName + " is deleted")
+  }
+
+  def deleteServiceColumnInner(serviceName: String, columnName: String) =
+    Management.deleteColumn(serviceName, columnName)
+
+  /**
+   * update
+   */
+
+  /**
+   * add Prop to label
+   * @param labelName
+   * @return
+   */
+  def addProp(labelName: String) = Action(parse.json) { request =>
+    val labelMetaTry = addPropInner(labelName, request.body)
+    tryResponse(labelMetaTry)(_.toJson)
+  }
+
+  def addPropInner(labelName: String, js: JsValue) = for {
+    prop <- requestParser.toPropElements(js)
+    labelMeta <- Management.addProp(labelName, prop)
+  } yield labelMeta
+
+  /**
+   * add prop to serviceColumn
+   * @param serviceName
+   * @param columnName
+   * @return
+   */
+  def addServiceColumnProp(serviceName: String, columnName: String) = 
Action(parse.json) { request =>
+    addServiceColumnPropInner(serviceName, columnName)(request.body) match {
+      case None => bad(s"can`t find service with $serviceName or can`t find 
serviceColumn with $columnName")
+      case Some(m) => Ok(m.toJson).as(applicationJsonHeader)
+    }
+  }
+
+  def addServiceColumnPropInner(serviceName: String, columnName: String)(js: 
JsValue) = {
+    for {
+      service <- Service.findByName(serviceName)
+      serviceColumn <- ServiceColumn.find(service.id.get, columnName)
+      prop <- requestParser.toPropElements(js).toOption
+    } yield {
+      ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, 
prop.defaultValue)
+    }
+  }
+
+  /**
+   * add props to serviceColumn
+   * @param serviecName
+   * @param columnName
+   * @return
+   */
+  def addServiceColumnProps(serviecName: String, columnName: String) = 
Action(parse.json) { request =>
+    val jsObjs = 
request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject])
+    val newProps = for {
+      js <- jsObjs
+      newProp <- addServiceColumnPropInner(serviecName, columnName)(js)
+    } yield newProp
+    ok(s"${newProps.size} is added.")
+  }
+
+  /**
+   * copy label
+   * @param oldLabelName
+   * @param newLabelName
+   * @return
+   */
+  def copyLabel(oldLabelName: String, newLabelName: String) = Action { request 
=>
+    val copyTry = Management.copyLabel(oldLabelName, newLabelName, 
Some(newLabelName))
+    tryResponse(copyTry)(_.label + "created")
+  }
+
+  /**
+   * rename label
+   * @param oldLabelName
+   * @param newLabelName
+   * @return
+   */
+  def renameLabel(oldLabelName: String, newLabelName: String) = Action { 
request =>
+    Label.findByName(oldLabelName) match {
+      case None => NotFound.as(applicationJsonHeader)
+      case Some(label) =>
+        Management.updateLabelName(oldLabelName, newLabelName)
+        ok(s"Label was updated")
+    }
+  }
+
+  /**
+   * update HTable for a label
+   * @param labelName
+   * @param newHTableName
+   * @return
+   */
+  def updateHTable(labelName: String, newHTableName: String) = Action { 
request =>
+    val updateTry = Management.updateHTable(labelName, newHTableName)
+    tryResponse(updateTry)(_.toString + " label(s) updated.")
+  }
+
+
+  case class HTableParams(cluster: String, hTableName: String,
+    preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: 
Option[String]) {
+
+    override def toString(): String = {
+      s"""HtableParams
+         |-- cluster : $cluster
+         |-- hTableName : $hTableName
+         |-- preSplitSize : $preSplitSize
+         |-- hTableTTL : $hTableTTL
+         |-- compressionAlgorithm : $compressionAlgorithm
+         |""".stripMargin
+    }
+  }
+
+  implicit object HTableParamsJsonConverter extends Format[HTableParams] {
+    def reads(json: JsValue): JsResult[HTableParams] = (
+    (__ \ "cluster").read[String] and
+    (__ \ "hTableName").read[String] and
+    (__ \ "preSplitSize").read[Int] and
+    (__ \ "hTableTTL").readNullable[Int] and
+      (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply 
_).reads(json)
+
+    def writes(o: HTableParams): JsValue = Json.obj(
+      "cluster" -> o.cluster,
+      "hTableName" -> o.hTableName,
+      "preSplitSize" -> o.preSplitSize,
+      "hTableTTL" -> o.hTableTTL,
+      "compressionAlgorithm" -> o.compressionAlgorithm
+    )
+  }
+
+  implicit object JsErrorJsonWriter extends Writes[JsError] {
+    def writes(o: JsError): JsValue = Json.obj(
+      "errors" -> JsArray(
+        o.errors.map {
+          case (path, validationErrors) => Json.obj(
+            "path" -> Json.toJson(path.toString()),
+            "validationErrors" -> JsArray(validationErrors.map(validationError 
=> Json.obj(
+              "message" -> JsString(validationError.message),
+              "args" -> JsArray(validationError.args.map(_ match {
+                case x: Int => JsNumber(x)
+                case x => JsString(x.toString)
+              }))
+            )))
+          )
+        }
+      )
+    )
+  }
+
+  def createHTable() = Action { request =>
+
+    //    Management.createTable(cluster, hTableName, List("e", "v"), 
preSplitSize, hTableTTL, compressionAlgorithm)
+    request.body.asJson.map(_.validate[HTableParams] match {
+      case JsSuccess(hTableParams, _) => {
+        Management.createTable(hTableParams.cluster, hTableParams.hTableName, 
List("e", "v"), hTableParams.preSplitSize, hTableParams.hTableTTL, 
hTableParams.compressionAlgorithm.getOrElse(Management.defaultCompressionAlgorithm))
+        logger.info(hTableParams.toString())
+        ok(s"HTable was created.")
+      }
+      case err@JsError(_) => bad(Json.toJson(err))
+    }).getOrElse(bad("Invalid Json."))
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/ApplicationController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ApplicationController.scala 
b/s2rest_play/app/controllers/ApplicationController.scala
new file mode 100644
index 0000000..fefe93e
--- /dev/null
+++ b/s2rest_play/app/controllers/ApplicationController.scala
@@ -0,0 +1,83 @@
+package controllers
+
+import com.kakao.s2graph.core.utils.logger
+import play.api.libs.iteratee.Enumerator
+import play.api.libs.json.{JsString, JsValue}
+import play.api.mvc._
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object ApplicationController extends Controller {
+
+  var isHealthy = true
+  var deployInfo = ""
+  val applicationJsonHeader = "application/json"
+
+  val jsonParser: BodyParser[JsValue] = controllers.s2parse.json
+
+  def updateHealthCheck(isHealthy: Boolean) = Action { request =>
+    this.isHealthy = isHealthy
+    Ok(this.isHealthy + "\n")
+  }
+
+  def healthCheck() = withHeader(parse.anyContent) { request =>
+    if (isHealthy) Ok(deployInfo)
+    else NotFound
+  }
+
+  def jsonResponse(json: JsValue, headers: (String, String)*) =
+    if (ApplicationController.isHealthy) {
+      Ok(json).as(applicationJsonHeader).withHeaders(headers: _*)
+    } else {
+      Result(
+        header = ResponseHeader(OK),
+        body = Enumerator(json.toString.getBytes()),
+        connection = HttpConnection.Close
+      ).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: 
headers: _*)
+    }
+
+  def toLogMessage[A](request: Request[A], result: Result)(startedAt: Long): 
String = {
+    val duration = System.currentTimeMillis() - startedAt
+    val isQueryRequest = result.header.headers.contains("result_size")
+    val resultSize = result.header.headers.getOrElse("result_size", "-1")
+
+    try {
+      val body = request.body match {
+        case AnyContentAsJson(jsValue) => jsValue match {
+          case JsString(str) => str
+          case _ => jsValue.toString
+        }
+        case _ => request.body.toString
+      }
+
+      val str =
+        if (isQueryRequest)
+          s"${request.method} ${request.uri} took ${duration} ms 
${result.header.status} ${resultSize} ${body}"
+        else
+          s"${request.method} ${request.uri} took ${duration} ms 
${result.header.status} ${resultSize} ${body}"
+
+      logger.info(s"${request.method} ${request.uri} result_size: $resultSize")
+
+      str
+    } finally {
+      /* pass */
+    }
+  }
+
+  def withHeaderAsync[A](bodyParser: BodyParser[A])(block: Request[A] => 
Future[Result])(implicit ex: ExecutionContext) =
+    Action.async(bodyParser) { request =>
+      val startedAt = System.currentTimeMillis()
+      block(request).map { r =>
+        logger.info(toLogMessage(request, r)(startedAt))
+        r
+      }
+    }
+
+  def withHeader[A](bodyParser: BodyParser[A])(block: Request[A] => Result) =
+    Action(bodyParser) { request: Request[A] =>
+      val startedAt = System.currentTimeMillis()
+      val r = block(request)
+      logger.info(toLogMessage(request, r)(startedAt))
+      r
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/CounterController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/CounterController.scala 
b/s2rest_play/app/controllers/CounterController.scala
new file mode 100644
index 0000000..c2b4dc2
--- /dev/null
+++ b/s2rest_play/app/controllers/CounterController.scala
@@ -0,0 +1,747 @@
+package controllers
+
+import com.kakao.s2graph.core.ExceptionHandler
+import com.kakao.s2graph.core.ExceptionHandler.KafkaMessage
+import com.kakao.s2graph.core.mysqls.Label
+import config.CounterConfig
+import models._
+import org.apache.kafka.clients.producer.ProducerRecord
+import play.api.Play
+import play.api.libs.json.Reads._
+import play.api.libs.json._
+import play.api.mvc.{Action, Controller, Request}
+import s2.config.S2CounterConfig
+import s2.counter.core.TimedQualifier.IntervalUnit
+import s2.counter.core._
+import s2.counter.core.v1.{ExactStorageAsyncHBase, RankingStorageRedis}
+import s2.counter.core.v2.{ExactStorageGraph, RankingStorageGraph}
+import s2.models.Counter.ItemType
+import s2.models.{Counter, CounterModel}
+import s2.util.{CartesianProduct, ReduceMapValue, UnitConverter}
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Created by hsleep([email protected]) on 15. 5. 22..
+ */
+object CounterController extends Controller {
+  import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+  val config = Play.current.configuration.underlying
+  val s2config = new S2CounterConfig(config)
+
+  private val exactCounterMap = Map(
+    s2.counter.VERSION_1 -> new ExactCounter(config, new 
ExactStorageAsyncHBase(config)),
+    s2.counter.VERSION_2 -> new ExactCounter(config, new 
ExactStorageGraph(config))
+  )
+  private val rankingCounterMap = Map(
+    s2.counter.VERSION_1 -> new RankingCounter(config, new 
RankingStorageRedis(config)),
+    s2.counter.VERSION_2 -> new RankingCounter(config, new 
RankingStorageGraph(config))
+  )
+
+  private val tablePrefixMap = Map (
+    s2.counter.VERSION_1 -> "s2counter",
+    s2.counter.VERSION_2 -> "s2counter_v2"
+  )
+
+  private def exactCounter(version: Byte): ExactCounter = 
exactCounterMap(version)
+  private def rankingCounter(version: Byte): RankingCounter = 
rankingCounterMap(version)
+
+  lazy val counterModel = new CounterModel(config)
+
+  def getQueryString[T](key: String, default: String)(implicit request: 
Request[T]): String = {
+    request.getQueryString(key).getOrElse(default)
+  }
+
+  implicit val counterWrites = new Writes[Counter] {
+    override def writes(o: Counter): JsValue = Json.obj(
+      "version" -> o.version.toInt,
+      "autoComb" -> o.autoComb,
+      "dimension" -> o.dimension,
+      "useProfile" -> o.useProfile,
+      "bucketImpId" -> o.bucketImpId,
+      "useRank" -> o.useRank,
+      "intervalUnit" -> o.intervalUnit,
+      "ttl" -> o.ttl,
+      "dailyTtl" -> o.dailyTtl,
+      "rateAction" -> o.rateActionId.flatMap { actionId =>
+        counterModel.findById(actionId, useCache = false).map { actionPolicy =>
+          Json.obj("service" -> actionPolicy.service, "action" -> 
actionPolicy.action)
+        }
+      },
+      "rateBase" -> o.rateBaseId.flatMap { baseId =>
+        counterModel.findById(baseId, useCache = false).map { basePolicy =>
+          Json.obj("service" -> basePolicy.service, "action" -> 
basePolicy.action)
+        }
+      },
+      "rateThreshold" -> o.rateThreshold
+    )
+  }
+
+  def createAction(service: String, action: String) = Action(s2parse.json) { 
implicit request =>
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case None =>
+        val body = request.body
+        val version = (body \ 
"version").asOpt[Int].map(_.toByte).getOrElse(s2.counter.VERSION_2)
+        val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true)
+        val dimension = (body \ "dimension").asOpt[String].getOrElse("")
+        val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false)
+        val bucketImpId = (body \ "bucketImpId").asOpt[String]
+
+        val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true)
+        val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true)
+
+        val intervalUnit = (body \ "intervalUnit").asOpt[String]
+        // 2 day
+        val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60)
+        val dailyTtl = (body \ "dailyTtl").asOpt[Int]
+        val regionMultiplier = (body \ 
"regionMultiplier").asOpt[Int].getOrElse(1)
+
+        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
+        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
+        val rateThreshold = (body \ "rateThreshold").asOpt[Int]
+
+        val rateActionId = {
+          for {
+            actionMap <- rateAction
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action)
+          } yield {
+            policy.id
+          }
+        }
+        val rateBaseId = {
+          for {
+            actionMap <- rateBase
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action)
+          } yield {
+            policy.id
+          }
+        }
+
+        val hbaseTable = {
+          Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_"
+        }
+
+        // find label
+        val itemType = Label.findByName(action, useCache = false) match {
+          case Some(label) =>
+            ItemType.withName(label.tgtColumnType.toUpperCase)
+          case None =>
+            val strItemType = (body \ 
"itemType").asOpt[String].getOrElse("STRING")
+            ItemType.withName(strItemType.toUpperCase)
+        }
+        val policy = Counter(useFlag = true, version, service, action, 
itemType, autoComb = autoComb, dimension,
+          useProfile = useProfile, bucketImpId, useRank = useRank, ttl, 
dailyTtl, Some(hbaseTable), intervalUnit,
+          rateActionId, rateBaseId, rateThreshold)
+
+        // prepare exact storage
+        exactCounter(version).prepare(policy)
+        if (useRank) {
+          // prepare ranking storage
+          rankingCounter(version).prepare(policy)
+        }
+        counterModel.createServiceAction(policy)
+        Ok(Json.toJson(Map("msg" -> s"created $service/$action")))
+      case Some(policy) =>
+        Ok(Json.toJson(Map("msg" -> s"already exist $service/$action")))
+    }
+  }
+
+  def getAction(service: String, action: String) = Action { request =>
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case Some(policy) =>
+        Ok(Json.toJson(policy))
+      case None =>
+        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
+    }
+  }
+
+  def updateAction(service: String, action: String) = Action(s2parse.json) { 
request =>
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case Some(oldPolicy) =>
+        val body = request.body
+        val autoComb = (body \ 
"autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb)
+        val dimension = (body \ 
"dimension").asOpt[String].getOrElse(oldPolicy.dimension)
+        val useProfile = (body \ 
"useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile)
+        val bucketImpId = (body \ "bucketImpId").asOpt[String] match {
+          case Some(s) => Some(s)
+          case None => oldPolicy.bucketImpId
+        }
+
+        val useRank = (body \ 
"useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank)
+
+        val intervalUnit = (body \ "intervalUnit").asOpt[String] match {
+          case Some(s) => Some(s)
+          case None => oldPolicy.intervalUnit
+        }
+
+        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
+        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
+        val rateThreshold = (body \ "rateThreshold").asOpt[Int] match {
+          case Some(i) => Some(i)
+          case None => oldPolicy.rateThreshold
+        }
+
+        val rateActionId = {
+          for {
+            actionMap <- rateAction
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action, 
useCache = false)
+          } yield {
+            policy.id
+          }
+        } match {
+          case Some(i) => Some(i)
+          case None => oldPolicy.rateActionId
+        }
+        val rateBaseId = {
+          for {
+            actionMap <- rateBase
+            service <- actionMap.get("service")
+            action <- actionMap.get("action")
+            policy <- counterModel.findByServiceAction(service, action, 
useCache = false)
+          } yield {
+            policy.id
+          }
+        } match {
+          case Some(i) => Some(i)
+          case None => oldPolicy.rateBaseId
+        }
+
+        // new counter
+        val policy = Counter(id = oldPolicy.id, useFlag = oldPolicy.useFlag, 
oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, 
dimension,
+          useProfile = useProfile, bucketImpId, useRank = useRank, 
oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit,
+          rateActionId, rateBaseId, rateThreshold)
+
+        counterModel.updateServiceAction(policy)
+        Ok(Json.toJson(Map("msg" -> s"updated $service/$action")))
+      case None =>
+        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
+    }
+  }
+
+  def prepareAction(service: String, action: String) = Action(s2parse.json) { 
request =>
+    // for data migration
+    counterModel.findByServiceAction(service, action, useCache = false) match {
+      case Some(policy) =>
+        val body = request.body
+        val version = (body \ "version").as[Int].toByte
+        if (version != policy.version) {
+          // change table name
+          val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) 
++ policy.dailyTtl mkString "_"
+          val newPolicy = policy.copy(version = version, hbaseTable = 
Some(newTableName))
+          exactCounter(version).prepare(newPolicy)
+          if (newPolicy.useRank) {
+            rankingCounter(version).prepare(newPolicy)
+          }
+          Ok(Json.toJson(Map("msg" -> s"prepare storage v$version 
$service/$action")))
+        } else {
+          Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version 
$service/$action")))
+        }
+      case None =>
+        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
+    }
+  }
+
+  def deleteAction(service: String, action: String) = Action.apply {
+    {
+      for {
+        policy <- counterModel.findByServiceAction(service, action, useCache = 
false)
+      } yield {
+        Try {
+          exactCounter(policy.version).destroy(policy)
+          if (policy.useRank) {
+            rankingCounter(policy.version).destroy(policy)
+          }
+          counterModel.deleteServiceAction(policy)
+        } match {
+          case Success(v) =>
+            Ok(Json.toJson(Map("msg" -> s"deleted $service/$action")))
+          case Failure(ex) =>
+            throw ex
+        }
+      }
+    }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not 
found"))))
+  }
+
+  def getExactCountAsync(service: String, action: String, item: String) = 
Action.async { implicit request =>
+    val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
+      .map(IntervalUnit.withName)
+    val limit = getQueryString("limit", "1").toInt
+
+    val qsSum = request.getQueryString("sum")
+
+    val optFrom = 
request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
+    val optTo = 
request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
+
+    val limitOpt = (optFrom, optTo) match {
+      case (Some(_), Some(_)) =>
+        None
+      case _ =>
+        Some(limit)
+    }
+
+    // find dimension
+    lazy val dimQueryValues = request.queryString.filterKeys { k => 
k.charAt(0) == ':' }.map { case (k, v) =>
+      (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet)
+    }
+//    Logger.warn(s"$dimQueryValues")
+
+    counterModel.findByServiceAction(service, action) match {
+      case Some(policy) =>
+        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optFrom, optTo)
+        val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
+        try {
+//          Logger.warn(s"$tqs $qsSum")
+          if (tqs.head.length > 1 && qsSum.isDefined) {
+            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, 
qsSum).map { jsVal =>
+              Ok(jsVal)
+            }
+          } else {
+            getExactCountToJs(policy, item, timeRange, limitOpt, 
dimQueryValues).map { jsVal =>
+              Ok(jsVal)
+            }
+          }
+        } catch {
+          case e: Exception =>
+            throw e
+//            Future.successful(BadRequest(s"$service, $action, $item"))
+        }
+      case None =>
+        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action 
not found"))))
+    }
+  }
+
+  /**
+   * [{
+   *    "service": , "action", "itemIds": [], "interval": string, "limit": 
int, "from": ts, "to": ts,
+   *    "dimensions": [{"key": list[String]}]
+   * }]
+   * @return
+   */
+  private def parseExactCountParam(jsValue: JsValue) = {
+    val service = (jsValue \ "service").as[String]
+    val action = (jsValue \ "action").as[String]
+    val itemIds = (jsValue \ "itemIds").as[Seq[String]]
+    val intervals = (jsValue \ 
"intervals").asOpt[Seq[String]].getOrElse(Seq("t")).distinct.map(IntervalUnit.withName)
+    val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1)
+    val from = (jsValue \ "from").asOpt[Long]
+    val to = (jsValue \ "to").asOpt[Long]
+    val sum = (jsValue \ "sum").asOpt[String]
+    val dimensions = {
+      for {
+        dimension <- (jsValue \ 
"dimensions").asOpt[Seq[JsObject]].getOrElse(Nil)
+        (k, vs) <- dimension.fields
+      } yield {
+        k -> vs.as[Seq[String]].toSet
+      }
+    }.toMap
+    (service, action, itemIds, intervals, limit, from, to, dimensions, sum)
+  }
+
+  def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request 
=>
+    val jsValue = request.body
+    try {
+      val futures = {
+        for {
+          jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil)
+          (service, action, itemIds, intervalUnits, limit, from, to, 
dimQueryValues, qsSum) = parseExactCountParam(jsObject)
+          optFrom = from.map(UnitConverter.toMillis)
+          optTo = to.map(UnitConverter.toMillis)
+          timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
+          policy <- counterModel.findByServiceAction(service, action).toSeq
+          item <- itemIds
+        } yield {
+          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optFrom, optTo)
+          val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, 
optFrom, optTo)
+          val limitOpt = (optFrom, optTo) match {
+            case (Some(_), Some(_)) =>
+              None
+            case _ =>
+              Some(limit)
+          }
+
+//          Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs")
+
+          if (tqs.head.length > 1 && qsSum.isDefined) {
+            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum)
+          } else {
+            getExactCountToJs(policy, item, timeRange, limitOpt, 
dimQueryValues)
+          }
+        }
+      }
+      Future.sequence(futures).map { rets =>
+        Ok(Json.toJson(rets))
+      }
+    } catch {
+      case e: Exception =>
+        throw e
+//        Future.successful(BadRequest(s"$jsValue"))
+    }
+  }
+
+  private [controllers] def fetchedToResult(fetchedCounts: 
FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = {
+    for {
+      ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap
+    } yield {
+      val counterItems = {
+        val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts }
+        val limited = limitOpt match {
+          case Some(limit) => sortedItems.take(limit)
+          case None => sortedItems
+        }
+        for {
+          (eq, value) <- limited
+        } yield {
+          ExactCounterItem(eq.tq.ts, value, value.toDouble)
+        }
+      }
+      ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems)
+    }
+  }.toSeq
+
+  private def decayedToResult(decayedCounts: DecayedCounts): 
Seq[ExactCounterIntervalItem] = {
+    for {
+      (eq, score) <- decayedCounts.qualifierWithCountMap
+    } yield {
+      ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, 
Seq(ExactCounterItem(eq.tq.ts, score.toLong, score)))
+    }
+  }.toSeq
+
+  private def getExactCountToJs(policy: Counter,
+                                item: String,
+                                timeRange: Seq[(TimedQualifier, 
TimedQualifier)],
+                                limitOpt: Option[Int],
+                                dimQueryValues: Map[String, Set[String]]): 
Future[JsValue] = {
+    exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, 
dimQueryValues).map { seq =>
+      val items = {
+        for {
+          fetched <- seq
+        } yield {
+          fetchedToResult(fetched, limitOpt)
+        }
+      }.flatten
+      Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, 
policy.action, item), items))
+    }
+  }
+
+  private def getDecayedCountToJs(policy: Counter,
+                                  item: String,
+                                  timeRange: Seq[(TimedQualifier, 
TimedQualifier)],
+                                  dimQueryValues: Map[String, Set[String]],
+                                  qsSum: Option[String]): Future[JsValue] = {
+    exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), 
timeRange, dimQueryValues, qsSum).map { seq =>
+      val decayedCounts = seq.head
+      val meta = ExactCounterResultMeta(policy.service, policy.action, 
decayedCounts.exactKey.itemKey)
+      val intervalItems = decayedToResult(decayedCounts)
+      Json.toJson(ExactCounterResult(meta, intervalItems))
+    }
+  }
+
+  def getRankingCountAsync(service: String, action: String) = Action.async { 
implicit request =>
+    lazy val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
+      .map(IntervalUnit.withName)
+    lazy val limit = getQueryString("limit", "1").toInt
+    lazy val kValue = getQueryString("k", "10").toInt
+
+    lazy val qsSum = request.getQueryString("sum")
+
+    lazy val optFrom = 
request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
+    lazy val optTo = 
request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
+
+    // find dimension
+    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) 
== ':' }.map { case (k, v) =>
+      (k.substring(1), v.mkString(",").split(',').toList)
+    }
+
+    val dimensions = {
+      for {
+        values <- CartesianProduct(dimensionMap.values.toList).toSeq
+      } yield {
+        dimensionMap.keys.zip(values).toMap
+      }
+    }
+
+    counterModel.findByServiceAction(service, action) match {
+      case Some(policy) =>
+        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, 
optTo)
+        val dimKeys = {
+          for {
+            dimension <- dimensions
+          } yield {
+            dimension -> tqs.map(tq => RankingKey(policy.id, policy.version, 
ExactQualifier(tq, dimension)))
+          }
+        }
+
+        // if tqs has only 1 tq, do not apply sum function
+        try {
+          val rankResult = {
+            if (tqs.length > 1 && qsSum.isDefined) {
+              getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum)
+            } else {
+              // no summary
+              Future.successful(getRankCounterResult(policy, dimKeys, kValue))
+            }
+          }
+
+          rankResult.map { result =>
+            Ok(Json.toJson(result))
+          }
+        } catch {
+          case e: UnsupportedOperationException =>
+            Future.successful(NotImplemented(Json.toJson(
+              Map("msg" -> e.getMessage)
+            )))
+          case e: Throwable =>
+            throw e
+        }
+      case None =>
+        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action 
not found"))))
+    }
+  }
+
+  def deleteRankingCount(service: String, action: String) = Action.async { 
implicit request =>
+    lazy val intervalUnits = getQueryString("interval", getQueryString("step", 
"t")).split(',').toSeq
+      .map(IntervalUnit.withName)
+    lazy val limit = getQueryString("limit", "1").toInt
+
+    // find dimension
+    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) 
== ':' }.map { case (k, v) =>
+      (k.substring(1), v.mkString(",").split(',').toList)
+    }
+
+    val dimensions = {
+      for {
+        values <- CartesianProduct(dimensionMap.values.toList).toSeq
+      } yield {
+        dimensionMap.keys.zip(values).toMap
+      }
+    }
+
+    Future {
+      counterModel.findByServiceAction(service, action) match {
+        case Some(policy) =>
+          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit)
+          val keys = {
+            for {
+              dimension <- dimensions
+              tq <- tqs
+            } yield {
+              RankingKey(policy.id, policy.version, ExactQualifier(tq, 
dimension))
+            }
+          }
+
+          for {
+            key <- keys
+          } {
+            rankingCounter(policy.version).delete(key)
+          }
+
+          Ok(JsObject(
+            Seq(
+              ("msg", Json.toJson(s"delete ranking in $service.$action")),
+              ("items", Json.toJson({
+                for {
+                  key <- keys
+                } yield {
+                  s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}"
+                }
+              }))
+            )
+          ))
+        case None =>
+          NotFound(Json.toJson(
+            Map("msg" -> s"$service.$action not found")
+          ))
+      }
+    }
+  }
+
+  val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, 
RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1))
+
+  // change format
+  private def getDecayedCountsAsync(policy: Counter,
+                                    items: Seq[String],
+                                    timeRange: (TimedQualifier, 
TimedQualifier),
+                                    dimension: Map[String, String],
+                                    qsSum: Option[String]): 
Future[Seq[(ExactKeyTrait, Double)]] = {
+    exactCounter(policy.version).getDecayedCountsAsync(policy, items, 
Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq =>
+      for {
+        DecayedCounts(exactKey, qcMap) <- seq
+        value <- qcMap.values
+      } yield {
+        exactKey -> value
+      }
+    }
+  }
+
+  def getSumRankCounterResultAsync(policy: Counter,
+                                   dimKeys: Seq[(Map[String, String], 
Seq[RankingKey])],
+                                   kValue: Int,
+                                   qsSum: Option[String]): 
Future[RankCounterResult] = {
+    val futures = {
+      for {
+        (dimension, keys) <- dimKeys
+      } yield {
+        val tqs = keys.map(rk => rk.eq.tq)
+        val (tqFrom, tqTo) = (tqs.last, tqs.head)
+        val items = rankingCounter(policy.version).getAllItems(keys, kValue)
+//        Logger.warn(s"item count: ${items.length}")
+        val future = {
+          if (policy.isRateCounter) {
+            val actionPolicy = 
policy.rateActionId.flatMap(counterModel.findById(_)).get
+            val basePolicy = 
policy.rateBaseId.flatMap(counterModel.findById(_)).get
+
+            val futureAction = getDecayedCountsAsync(actionPolicy, items, 
(tqFrom, tqTo), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(score, -1)
+              }.toMap
+            }
+            val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, 
tqTo), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(-1, score)
+              }.toMap
+            }
+            futureAction.zip(futureBase).map { case (actionScores, baseScores) 
=>
+              reduceRateRankingValue(actionScores, baseScores).map { case (k, 
rrv) =>
+//                Logger.warn(s"$k -> $rrv")
+                k -> rrv.rankingValue.score
+              }.toSeq
+            }
+          }
+          else if (policy.isTrendCounter) {
+            val actionPolicy = 
policy.rateActionId.flatMap(counterModel.findById(_)).get
+            val basePolicy = 
policy.rateBaseId.flatMap(counterModel.findById(_)).get
+
+            val futureAction = getDecayedCountsAsync(actionPolicy, items, 
(tqFrom, tqTo), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(score, -1)
+              }.toMap
+            }
+            val futureBase = getDecayedCountsAsync(basePolicy, items, 
(tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq =>
+              seq.map { case (k, score) =>
+                ExactKey(policy, k.itemKey, checkItemType = false) -> 
RateRankingValue(-1, score)
+              }.toMap
+            }
+            futureAction.zip(futureBase).map { case (actionScores, baseScores) 
=>
+              reduceRateRankingValue(actionScores, baseScores).map { case (k, 
rrv) =>
+//                Logger.warn(s"$k -> $rrv")
+                k -> rrv.rankingValue.score
+              }.toSeq
+            }
+          }
+          else {
+            getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, 
qsSum)
+          }
+        }
+        future.map { keyWithScore =>
+          val ranking = keyWithScore.sortBy(-_._2).take(kValue)
+          val rankCounterItems = {
+            for {
+              idx <- ranking.indices
+              (exactKey, score) = ranking(idx)
+            } yield {
+              val realId = policy.itemType match {
+                case ItemType.BLOB => 
exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey)
+                  .getOrElse(throw new Exception(s"not found blob id. 
${policy.service}.${policy.action} ${exactKey.itemKey}"))
+                case _ => exactKey.itemKey
+              }
+              RankCounterItem(idx + 1, realId, score)
+            }
+          }
+
+          val eq = ExactQualifier(tqFrom, dimension)
+          RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, 
-1, rankCounterItems)
+        }
+      }
+    }
+
+    Future.sequence(futures).map { dimensionResultList =>
+      RankCounterResult(RankCounterResultMeta(policy.service, policy.action), 
dimensionResultList)
+    }
+  }
+
+  def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], 
Seq[RankingKey])], kValue: Int): RankCounterResult = {
+    val dimensionResultList = {
+      for {
+        (dimension, keys) <- dimKeys
+        key <- keys
+      } yield {
+        val rankingValue = rankingCounter(policy.version).getTopK(key, kValue)
+        val ranks = {
+          for {
+            rValue <- rankingValue.toSeq
+            idx <- rValue.values.indices
+            rank = idx + 1
+          } yield {
+            val (id, score) = rValue.values(idx)
+            val realId = policy.itemType match {
+              case ItemType.BLOB =>
+                exactCounter(policy.version)
+                  .getBlobValue(policy, id)
+                  .getOrElse(throw new Exception(s"not found blob id. 
${policy.service}.${policy.action} $id"))
+              case _ => id
+            }
+            RankCounterItem(rank, realId, score)
+          }
+        }
+        val eq = key.eq
+        val tq = eq.tq
+        RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, 
rankingValue.map(v => v.totalScore).getOrElse(0d), ranks)
+      }
+    }
+
+    RankCounterResult(RankCounterResultMeta(policy.service, policy.action), 
dimensionResultList)
+  }
+
+  type Record = ProducerRecord[String, String]
+
+  def incrementCount(service: String, action: String, item: String) = 
Action.async(s2parse.json) { request =>
+    Future {
+      /**
+       * {
+       * timestamp: Long
+       * property: {}
+       * value: Int
+       * }
+       */
+      lazy val metaMap = Map("service" -> service, "action" -> action, "item" 
-> item)
+      counterModel.findByServiceAction(service, action).map { policy =>
+        val body = request.body
+        try {
+          val ts = (body \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString
+          val dimension = (body \ 
"dimension").asOpt[JsValue].getOrElse(Json.obj())
+          val property = (body \ 
"property").asOpt[JsValue].getOrElse(Json.obj())
+
+          val msg = List(ts, service, action, item, dimension, 
property).mkString("\t")
+
+          // produce to kafka
+          // hash partitioner by key
+          ExceptionHandler.enqueue(KafkaMessage(new 
Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg)))
+
+          Ok(Json.toJson(
+            Map(
+              "meta" -> metaMap
+            )
+          ))
+        }
+        catch {
+          case e: JsResultException =>
+            BadRequest(Json.toJson(
+              Map("msg" -> s"need timestamp.")
+            ))
+        }
+      }.getOrElse {
+        NotFound(Json.toJson(
+          Map("msg" -> s"$service.$action not found")
+        ))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/EdgeController.scala 
b/s2rest_play/app/controllers/EdgeController.scala
new file mode 100644
index 0000000..32af6c3
--- /dev/null
+++ b/s2rest_play/app/controllers/EdgeController.scala
@@ -0,0 +1,222 @@
+package controllers
+
+import actors.QueueActor
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.{LabelMeta, Label}
+import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.types.LabelWithDirection
+import com.kakao.s2graph.core.utils.logger
+import config.Config
+import org.apache.kafka.clients.producer.ProducerRecord
+import play.api.libs.json._
+import play.api.mvc.{Controller, Result}
+
+import scala.collection.Seq
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+object EdgeController extends Controller {
+
+  import ExceptionHandler._
+  import controllers.ApplicationController._
+  import play.api.libs.concurrent.Execution.Implicits._
+
+  private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
+  private val requestParser: RequestParser = 
com.kakao.s2graph.rest.Global.s2parser
+
+  def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = 
false): Future[Result] = {
+    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+
+    else {
+      try {
+        logger.debug(s"$jsValue")
+        val edges = requestParser.toEdges(jsValue, operation)
+        for (edge <- edges) {
+          if (edge.isAsync)
+            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, 
None))
+          else
+            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
edge, None))
+        }
+
+        val edgesToStore = edges.filterNot(e => e.isAsync)
+
+        if (withWait) {
+          val rets = s2.mutateEdges(edgesToStore, withWait = true)
+          rets.map(Json.toJson(_)).map(jsonResponse(_))
+        } else {
+          val rets = edgesToStore.map { edge => QueueActor.router ! edge; true 
}
+          Future.successful(jsonResponse(Json.toJson(rets)))
+        }
+      } catch {
+        case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
+        case e: Exception =>
+          logger.error(s"mutateAndPublish: $e", e)
+          Future.successful(InternalServerError(s"${e.getStackTrace}"))
+      }
+    }
+  }
+
+  def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] 
= {
+    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+
+    logger.debug(s"$str")
+    val edgeStrs = str.split("\\n")
+
+    var vertexCnt = 0L
+    var edgeCnt = 0L
+    try {
+      val elements =
+        for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); 
element <- Graph.toGraphElement(str)) yield {
+          element match {
+            case v: Vertex => vertexCnt += 1
+            case e: Edge => edgeCnt += 1
+          }
+          if (element.isAsync) {
+            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, 
Some(str)))
+          } else {
+            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
element, Some(str)))
+          }
+          element
+        }
+
+      //FIXME:
+      val elementsToStore = elements.filterNot(e => e.isAsync)
+      if (withWait) {
+        val rets = s2.mutateElements(elementsToStore, withWait)
+        rets.map(Json.toJson(_)).map(jsonResponse(_))
+      } else {
+        val rets = elementsToStore.map { element => QueueActor.router ! 
element; true }
+        Future.successful(jsonResponse(Json.toJson(rets)))
+      }
+
+
+    } catch {
+      case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"$e"))
+      case e: Throwable =>
+        logger.error(s"mutateAndPublish: $e", e)
+        Future.successful(InternalServerError(s"${e.getStackTrace}"))
+    }
+  }
+
+  def mutateBulk() = withHeaderAsync(parse.text) { request =>
+    mutateAndPublish(request.body)
+  }
+
+  def inserts() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert")
+  }
+
+  def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert", withWait = true)
+  }
+
+  def insertsBulk() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insertBulk")
+  }
+
+  def deletes() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete")
+  }
+
+  def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete", withWait = true)
+  }
+
+  def updates() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "update")
+  }
+
+  def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "update", withWait = true)
+  }
+
+  def increments() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "increment")
+  }
+
+  def incrementCounts() = withHeaderAsync(jsonParser) { request =>
+    val jsValue = request.body
+    val edges = requestParser.toEdges(jsValue, "incrementCount")
+    s2.incrementCounts(edges).map { results =>
+      val json = results.map { case (isSuccess, resultCount) =>
+        Json.obj("success" -> isSuccess, "result" -> resultCount)
+      }
+
+      jsonResponse(Json.toJson(json))
+    }
+  }
+
+  def deleteAll() = withHeaderAsync(jsonParser) { request =>
+    deleteAllInner(request.body, withWait = false)
+  }
+
+  def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
+    val deleteResults = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
+
+      val labelName = (json \ "label").as[String]
+      val labels = Label.findByName(labelName).map { l => Seq(l) 
}.getOrElse(Nil)
+      val direction = (json \ "direction").asOpt[String].getOrElse("out")
+
+      val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
+      val ts = (json \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis())
+      val vertices = requestParser.toVertices(labelName, direction, ids)
+
+      /** logging for delete all request */
+
+      val kafkaMessages = for {
+        id <- ids
+        label <- labels
+      } yield {
+          val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
direction).mkString("\t")
+          val topic = if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else 
Config.KAFKA_LOG_TOPIC
+          val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, 
null, tsv))
+          kafkaMsg
+        }
+      ExceptionHandler.enqueues(kafkaMessages)
+
+      val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
+      future.onFailure { case ex: Exception =>
+        logger.error(s"[Error]: deleteAllInner failed.", ex)
+        val kafkaMessages = for {
+          id <- ids
+          label <- labels
+        } yield {
+            val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
direction).mkString("\t")
+            val topic = failTopic
+            val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, 
null, tsv))
+            kafkaMsg
+          }
+        ExceptionHandler.enqueues(kafkaMessages)
+        throw ex
+      }
+      if (withWait) {
+        future.map { ret =>
+          if (!ret) {
+            logger.error(s"[Error]: deleteAllInner failed.")
+            val kafkaMessages = for {
+              id <- ids
+              label <- labels
+            } yield {
+                val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
direction).mkString("\t")
+                val topic = failTopic
+                val kafkaMsg = KafkaMessage(new ProducerRecord[Key, 
Val](topic, null, tsv))
+                kafkaMsg
+              }
+            ExceptionHandler.enqueues(kafkaMessages)
+            false
+          } else {
+            true
+          }
+        }
+      } else {
+        Future.successful(true)
+      }
+    })
+
+    deleteResults.map { rst =>
+      logger.debug(s"deleteAllInner: $rst")
+      Ok(s"deleted... ${rst.toString()}")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/ExperimentController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/ExperimentController.scala 
b/s2rest_play/app/controllers/ExperimentController.scala
new file mode 100644
index 0000000..5ae2379
--- /dev/null
+++ b/s2rest_play/app/controllers/ExperimentController.scala
@@ -0,0 +1,114 @@
+package controllers
+
+
+import java.net.URL
+
+import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.utils.logger
+import play.api.Play.current
+import play.api.libs.json.{JsObject, JsString, JsValue, Json}
+import play.api.libs.ws.WS
+import play.api.mvc._
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+
+object ExperimentController extends Controller {
+  val impressionKey = "S2-Impression-Id"
+
+  import ApplicationController._
+
+  def experiment(accessToken: String, experimentName: String, uuid: String) = 
withHeaderAsync(parse.anyContent) { request =>
+    val bucketOpt = for {
+      service <- Service.findByAccessToken(accessToken)
+      experiment <- Experiment.findBy(service.id.get, experimentName)
+      bucket <- experiment.findBucket(uuid)
+    } yield bucket
+
+    bucketOpt match {
+      case None => Future.successful(NotFound("bucket is not found."))
+      case Some(bucket) =>
+        try {
+          if (bucket.isGraphQuery) buildRequestInner(request, bucket, uuid)
+          else buildRequest(request, bucket, uuid)
+        } catch {
+          case e: Exception =>
+            logger.error(e.toString())
+            Future.successful(BadRequest(s"wrong or missing template 
parameter: ${e.getMessage}"))
+        }
+    }
+  }
+
+  def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, 
uuid: String): JsValue = {
+    var body = bucket.requestBody.replace("#uuid", uuid)
+
+    for {
+      requestKeyJson <- requestKeyJsonOpt
+      jsObj <- requestKeyJson.asOpt[JsObject]
+      (key, value) <- jsObj.fieldSet
+    } {
+      val replacement = value match {
+        case JsString(s) => s
+        case _ => value.toString
+      }
+      body = body.replace(key, replacement)
+    }
+
+    Json.parse(body)
+  }
+
+  private def buildRequestInner(request: Request[AnyContent], bucket: Bucket, 
uuid: String): Future[Result] = {
+    if (bucket.isEmpty) Future.successful(Ok(Json.obj("isEmpty" -> 
true)).withHeaders(impressionKey -> bucket.impressionId))
+    else {
+      val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
+      val url = new URL(bucket.apiPath)
+      val path = url.getPath()
+
+      // dummy log for sampling
+      val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
+      logger.info(experimentLog)
+
+      val response = path match {
+        case "/graphs/getEdges" => 
controllers.QueryController.getEdgesInner(jsonBody)
+        case "/graphs/getEdges/grouped" => 
controllers.QueryController.getEdgesWithGroupingInner(jsonBody)
+        case "/graphs/getEdgesExcluded" => 
controllers.QueryController.getEdgesExcludedInner(jsonBody)
+        case "/graphs/getEdgesExcluded/grouped" => 
controllers.QueryController.getEdgesExcludedWithGroupingInner(jsonBody)
+        case "/graphs/checkEdges" => 
controllers.QueryController.checkEdgesInner(jsonBody)
+        case "/graphs/getEdgesGrouped" => 
controllers.QueryController.getEdgesGroupedInner(jsonBody)
+        case "/graphs/getEdgesGroupedExcluded" => 
controllers.QueryController.getEdgesGroupedExcludedInner(jsonBody)
+        case "/graphs/getEdgesGroupedExcludedFormatted" => 
controllers.QueryController.getEdgesGroupedExcludedFormattedInner(jsonBody)
+      }
+      response.map { r => r.withHeaders(impressionKey -> bucket.impressionId) }
+    }
+  }
+
+  private def toSimpleMap(map: Map[String, Seq[String]]): Map[String, String] 
= {
+    for {
+      (k, vs) <- map
+      headVal <- vs.headOption
+    } yield {
+      k -> headVal
+    }
+  }
+
+  private def buildRequest(request: Request[AnyContent], bucket: Bucket, uuid: 
String): Future[Result] = {
+    val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
+
+    val url = bucket.apiPath
+    val headers = request.headers.toSimpleMap.toSeq
+    val verb = bucket.httpVerb.toUpperCase
+    val qs = toSimpleMap(request.queryString).toSeq
+
+    val ws = WS.url(url)
+      .withMethod(verb)
+      .withBody(jsonBody)
+      .withHeaders(headers: _*)
+      .withQueryString(qs: _*)
+
+    ws.stream().map {
+      case (proxyResponse, proxyBody) =>
+        Result(ResponseHeader(proxyResponse.status, 
proxyResponse.headers.mapValues(_.toList.head)), 
proxyBody).withHeaders(impressionKey -> bucket.impressionId)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/JsonBodyParser.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/JsonBodyParser.scala 
b/s2rest_play/app/controllers/JsonBodyParser.scala
new file mode 100644
index 0000000..3e3d40c
--- /dev/null
+++ b/s2rest_play/app/controllers/JsonBodyParser.scala
@@ -0,0 +1,73 @@
+package controllers
+
+import com.kakao.s2graph.core.utils.logger
+import play.api.Play
+import play.api.libs.iteratee.Iteratee
+import play.api.libs.json.{JsValue, Json}
+import play.api.mvc._
+
+import scala.concurrent.Future
+import scala.util.control.NonFatal
+
+/**
+ * Created by hsleep([email protected]) on 15. 9. 1..
+ */
+
+object s2parse extends BodyParsers {
+
+  import parse._
+
+  val defaultMaxTextLength = 1024 * 512
+  val defaultMaxJsonLength = 1024 * 512
+
+//  def json: BodyParser[JsValue] = json(DEFAULT_MAX_TEXT_LENGTH)
+  def json: BodyParser[JsValue] = json(defaultMaxTextLength)
+
+  def json(maxLength: Int): BodyParser[JsValue] = when(
+    _.contentType.exists(m => m.equalsIgnoreCase("text/json") || 
m.equalsIgnoreCase("application/json")),
+    tolerantJson(maxLength),
+    createBadResult("Expecting text/json or application/json body")
+  )
+
+  def tolerantJson(maxLength: Int): BodyParser[JsValue] =
+    tolerantBodyParser[JsValue]("json", maxLength, "Invalid Json") { (request, 
bytes) =>
+      // Encoding notes: RFC 4627 requires that JSON be encoded in Unicode, 
and states that whether that's
+      // UTF-8, UTF-16 or UTF-32 can be auto detected by reading the first two 
bytes. So we ignore the declared
+      // charset and don't decode, we passing the byte array as is because 
Jackson supports auto detection.
+      Json.parse(bytes)
+    }
+
+  private def tolerantBodyParser[A](name: String, maxLength: Int, 
errorMessage: String)(parser: (RequestHeader, Array[Byte]) => A): BodyParser[A] 
=
+    BodyParser(name + ", maxLength=" + maxLength) { request =>
+      import play.api.libs.iteratee.Execution.Implicits.trampoline
+      import play.api.libs.iteratee.Traversable
+
+      import scala.util.control.Exception._
+
+      val bodyParser: Iteratee[Array[Byte], Either[Result, 
Either[Future[Result], A]]] =
+        Traversable.takeUpTo[Array[Byte]](maxLength).transform(
+          Iteratee.consume[Array[Byte]]().map { bytes =>
+            allCatch[A].either {
+              parser(request, bytes)
+            }.left.map {
+              case NonFatal(e) =>
+                val txt = new String(bytes)
+                logger.error(s"$errorMessage: $txt", e)
+                createBadResult(s"$errorMessage: $e")(request)
+              case t => throw t
+            }
+          }
+        ).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
+
+      bodyParser.mapM {
+        case Left(tooLarge) => Future.successful(Left(tooLarge))
+        case Right(Left(badResult)) => badResult.map(Left.apply)
+        case Right(Right(body)) => Future.successful(Right(body))
+      }
+    }
+
+  private def createBadResult(msg: String): RequestHeader => Future[Result] = 
{ request =>
+    Play.maybeApplication.map(_.global.onBadRequest(request, msg))
+      .getOrElse(Future.successful(Results.BadRequest))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/PublishController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/PublishController.scala 
b/s2rest_play/app/controllers/PublishController.scala
new file mode 100644
index 0000000..b1495d5
--- /dev/null
+++ b/s2rest_play/app/controllers/PublishController.scala
@@ -0,0 +1,54 @@
+package controllers
+
+import com.kakao.s2graph.core.ExceptionHandler
+import config.Config
+import org.apache.kafka.clients.producer.ProducerRecord
+import play.api.mvc._
+
+import scala.concurrent.Future
+
+object PublishController extends Controller {
+
+  import ApplicationController._
+  import ExceptionHandler._
+  import play.api.libs.concurrent.Execution.Implicits._
+
+  /**
+   * never check validation on string. just redirect strings to kafka.
+   */
+  val serviceNotExistException = new RuntimeException(s"service is not created 
in s2graph. create service first.")
+
+  //  private def toService(topic: String): String = {
+  //    Service.findByName(topic).map(service => 
s"${service.serviceName}-${Config.PHASE}").getOrElse(throw 
serviceNotExistException)
+  //  }
+  def publishOnly(topic: String) = withHeaderAsync(parse.text) { request =>
+    if (!Config.IS_WRITE_SERVER) Future.successful(UNAUTHORIZED)
+    //  val kafkaTopic = toService(topic)
+    val strs = request.body.split("\n")
+    strs.foreach(str => {
+      val keyedMessage = new ProducerRecord[Key, Val](Config.KAFKA_LOG_TOPIC, 
str)
+      //    val keyedMessage = new ProducerRecord[Key, Val](kafkaTopic, 
s"$str")
+      //        logger.debug(s"$kafkaTopic, $str")
+      ExceptionHandler.enqueue(KafkaMessage(keyedMessage))
+    })
+    Future.successful(
+      Ok("publish success.\n").withHeaders(CONNECTION -> "Keep-Alive", 
"Keep-Alive" -> "timeout=10, max=10")
+    )
+    //    try {
+    //
+    //    } catch {
+    //      case e: Exception => Future.successful(BadRequest(e.getMessage))
+    //    }
+  }
+
+  def publish(topic: String) = publishOnly(topic)
+
+  //  def mutateBulk(topic: String) = Action.async(parse.text) { request =>
+  //    EdgeController.mutateAndPublish(Config.KAFKA_LOG_TOPIC, 
Config.KAFKA_FAIL_TOPIC, request.body).map { result =>
+  //      result.withHeaders(CONNECTION -> "Keep-Alive", "Keep-Alive" -> 
"timeout=10, max=10")
+  //    }
+  //  }
+  def mutateBulk(topic: String) = withHeaderAsync(parse.text) { request =>
+    EdgeController.mutateAndPublish(request.body)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/QueryController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/QueryController.scala 
b/s2rest_play/app/controllers/QueryController.scala
new file mode 100644
index 0000000..480c142
--- /dev/null
+++ b/s2rest_play/app/controllers/QueryController.scala
@@ -0,0 +1,311 @@
+package controllers
+
+
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.types.{LabelWithDirection, VertexId}
+import com.kakao.s2graph.core.utils.logger
+import config.Config
+import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
+import play.api.mvc.{Action, Controller, Result}
+
+import scala.concurrent._
+import scala.language.postfixOps
+import scala.util.Try
+
+object QueryController extends Controller {
+
+  import ApplicationController._
+  import play.api.libs.concurrent.Execution.Implicits.defaultContext
+
+  private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
+  private val requestParser = com.kakao.s2graph.rest.Global.s2parser
+
+  private def badQueryExceptionResults(ex: Exception) = 
Future.successful(BadRequest(Json.obj("message" -> 
ex.getMessage)).as(applicationJsonHeader))
+
+  private def errorResults = 
Future.successful(Ok(PostProcess.timeoutResults).as(applicationJsonHeader))
+
+  def getEdges() = withHeaderAsync(jsonParser) { request =>
+    getEdgesInner(request.body)
+  }
+
+  def getEdgesExcluded = withHeaderAsync(jsonParser) { request =>
+    getEdgesExcludedInner(request.body)
+  }
+
+  private def eachQuery(post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
+    val filterOutQueryResultsLs = q.filterOutQuery match {
+      case Some(filterOutQuery) => s2.getEdges(filterOutQuery)
+      case None => Future.successful(Seq.empty)
+    }
+
+    for {
+      queryResultsLs <- s2.getEdges(q)
+      filterOutResultsLs <- filterOutQueryResultsLs
+    } yield {
+      val json = post(queryResultsLs, filterOutResultsLs)
+      json
+    }
+  }
+
+  private def calcSize(js: JsValue): Int = js match {
+    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
+    case JsArray(seq) => seq.map(js => (js \ 
"size").asOpt[Int].getOrElse(0)).sum
+    case _ => 0
+  }
+
+  private def getEdgesAsync(jsonQuery: JsValue)
+                           (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[Result] = {
+    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
+    val fetch = eachQuery(post) _
+//    logger.info(jsonQuery)
+
+    Try {
+      val future = jsonQuery match {
+        case JsArray(arr) => 
Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
+        case obj@JsObject(_) => fetch(requestParser.toQuery(obj))
+        case _ => throw BadQueryException("Cannot support")
+      }
+
+      future map { json => jsonResponse(json, "result_size" -> 
calcSize(json).toString) }
+
+    } recover {
+      case e: BadQueryException =>
+        logger.error(s"$jsonQuery, $e", e)
+        badQueryExceptionResults(e)
+      case e: Exception =>
+        logger.error(s"$jsonQuery, $e", e)
+        errorResults
+    } get
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  private def getEdgesExcludedAsync(jsonQuery: JsValue)
+                                   (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[Result] = {
+
+    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
+
+    Try {
+      val q = requestParser.toQuery(jsonQuery)
+      val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
+
+      val fetchFuture = s2.getEdges(q)
+      val excludeFuture = s2.getEdges(filterOutQuery)
+
+      for {
+        queryResultLs <- fetchFuture
+        exclude <- excludeFuture
+      } yield {
+        val json = post(queryResultLs, exclude)
+        jsonResponse(json, "result_size" -> calcSize(json).toString)
+      }
+    } recover {
+      case e: BadQueryException =>
+        logger.error(s"$jsonQuery, $e", e)
+        badQueryExceptionResults(e)
+      case e: Exception =>
+        logger.error(s"$jsonQuery, $e", e)
+        errorResults
+    } get
+  }
+
+  def getEdgesInner(jsonQuery: JsValue) = {
+    getEdgesAsync(jsonQuery)(PostProcess.toSimpleVertexArrJson)
+  }
+
+  def getEdgesExcludedInner(jsValue: JsValue) = {
+    getEdgesExcludedAsync(jsValue)(PostProcess.toSimpleVertexArrJson)
+  }
+
+  def getEdgesWithGrouping() = withHeaderAsync(jsonParser) { request =>
+    getEdgesWithGroupingInner(request.body)
+  }
+
+  def getEdgesWithGroupingInner(jsonQuery: JsValue) = {
+    getEdgesAsync(jsonQuery)(PostProcess.summarizeWithListFormatted)
+  }
+
+  def getEdgesExcludedWithGrouping() = withHeaderAsync(jsonParser) { request =>
+    getEdgesExcludedWithGroupingInner(request.body)
+  }
+
+  def getEdgesExcludedWithGroupingInner(jsonQuery: JsValue) = {
+    
getEdgesExcludedAsync(jsonQuery)(PostProcess.summarizeWithListExcludeFormatted)
+  }
+
+  def getEdgesGroupedInner(jsonQuery: JsValue) = {
+    getEdgesAsync(jsonQuery)(PostProcess.summarizeWithList)
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def getEdgesGrouped() = withHeaderAsync(jsonParser) { request =>
+    getEdgesGroupedInner(request.body)
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def getEdgesGroupedExcluded() = withHeaderAsync(jsonParser) { request =>
+    getEdgesGroupedExcludedInner(request.body)
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def getEdgesGroupedExcludedInner(jsonQuery: JsValue): Future[Result] = {
+    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
+
+    Try {
+      val q = requestParser.toQuery(jsonQuery)
+      val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
+
+      val fetchFuture = s2.getEdges(q)
+      val excludeFuture = s2.getEdges(filterOutQuery)
+
+      for {
+        queryResultLs <- fetchFuture
+        exclude <- excludeFuture
+      } yield {
+        val json = PostProcess.summarizeWithListExclude(queryResultLs, exclude)
+        jsonResponse(json, "result_size" -> calcSize(json).toString)
+      }
+    } recover {
+      case e: BadQueryException =>
+        logger.error(s"$jsonQuery, $e", e)
+        badQueryExceptionResults(e)
+      case e: Exception =>
+        logger.error(s"$jsonQuery, $e", e)
+        errorResults
+    } get
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def getEdgesGroupedExcludedFormatted = withHeaderAsync(jsonParser) { request 
=>
+    getEdgesGroupedExcludedFormattedInner(request.body)
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def getEdgesGroupedExcludedFormattedInner(jsonQuery: JsValue): 
Future[Result] = {
+    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
+
+    Try {
+      val q = requestParser.toQuery(jsonQuery)
+      val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
+
+      val fetchFuture = s2.getEdges(q)
+      val excludeFuture = s2.getEdges(filterOutQuery)
+
+      for {
+        queryResultLs <- fetchFuture
+        exclude <- excludeFuture
+      } yield {
+        val json = 
PostProcess.summarizeWithListExcludeFormatted(queryResultLs, exclude)
+        jsonResponse(json, "result_size" -> calcSize(json).toString)
+      }
+    } recover {
+      case e: BadQueryException =>
+        logger.error(s"$jsonQuery, $e", e)
+        badQueryExceptionResults(e)
+      case e: Exception =>
+        logger.error(s"$jsonQuery, $e", e)
+        errorResults
+    } get
+  }
+
+  def getEdge(srcId: String, tgtId: String, labelName: String, direction: 
String) = Action.async { request =>
+    if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized)
+    val params = Json.arr(Json.obj("label" -> labelName, "direction" -> 
direction, "from" -> srcId, "to" -> tgtId))
+    checkEdgesInner(params)
+  }
+
+  /**
+   * Vertex
+   */
+
+  def checkEdgesInner(jsValue: JsValue) = {
+    try {
+      val params = jsValue.as[List[JsValue]]
+      var isReverted = false
+      val labelWithDirs = 
scala.collection.mutable.HashSet[LabelWithDirection]()
+      val quads = for {
+        param <- params
+        labelName <- (param \ "label").asOpt[String]
+        direction <- GraphUtil.toDir((param \ 
"direction").asOpt[String].getOrElse("out"))
+        label <- Label.findByName(labelName)
+        srcId <- requestParser.jsValueToInnerVal((param \ "from").as[JsValue], 
label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion)
+        tgtId <- requestParser.jsValueToInnerVal((param \ "to").as[JsValue], 
label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion)
+      } yield {
+          val labelWithDir = LabelWithDirection(label.id.get, direction)
+          labelWithDirs += labelWithDir
+          val (src, tgt, dir) = if (direction == 1) {
+            isReverted = true
+            (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)),
+              Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)), 0)
+          } else {
+            (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)),
+              Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)), 0)
+          }
+
+          //          logger.debug(s"SrcVertex: $src")
+          //          logger.debug(s"TgtVertex: $tgt")
+          //          logger.debug(s"direction: $dir")
+          (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
+        }
+
+      s2.checkEdges(quads).map { case queryRequestWithResultLs =>
+        val edgeJsons = for {
+          queryRequestWithResult <- queryRequestWithResultLs
+          (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+          edgeWithScore <- queryResult.edgeWithScoreLs
+          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          convertedEdge = if (isReverted) edge.duplicateEdge else edge
+          edgeJson = PostProcess.edgeToJson(convertedEdge, score, 
queryRequest.query, queryRequest.queryParam)
+        } yield Json.toJson(edgeJson)
+
+        val json = Json.toJson(edgeJsons)
+        jsonResponse(json, "result_size" -> edgeJsons.size.toString)
+      }
+    } catch {
+      case e: Exception =>
+        logger.error(s"$jsValue, $e", e)
+        errorResults
+    }
+  }
+
+  def checkEdges() = withHeaderAsync(jsonParser) { request =>
+    if (!Config.IS_QUERY_SERVER) Future.successful(Unauthorized)
+
+    checkEdgesInner(request.body)
+  }
+
+  def getVertices() = withHeaderAsync(jsonParser) { request =>
+    getVerticesInner(request.body)
+  }
+
+  def getVerticesInner(jsValue: JsValue) = {
+    if (!Config.IS_QUERY_SERVER) Unauthorized.as(applicationJsonHeader)
+
+    val jsonQuery = jsValue
+    val ts = System.currentTimeMillis()
+    val props = "{}"
+
+    Try {
+      val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
+        val serviceName = (js \ "serviceName").as[String]
+        val columnName = (js \ "columnName").as[String]
+        for (id <- (js \ 
"ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
+          Management.toVertex(ts, "insert", id.toString, serviceName, 
columnName, props)
+        }
+      }
+
+      s2.getVertices(vertices) map { vertices =>
+        val json = PostProcess.verticesToJson(vertices)
+        jsonResponse(json, "result_size" -> calcSize(json).toString)
+      }
+    } recover {
+      case e: play.api.libs.json.JsResultException =>
+        logger.error(s"$jsonQuery, $e", e)
+        badQueryExceptionResults(e)
+      case e: Exception =>
+        logger.error(s"$jsonQuery, $e", e)
+        errorResults
+    } get
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/TestController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/TestController.scala 
b/s2rest_play/app/controllers/TestController.scala
new file mode 100644
index 0000000..8558ae5
--- /dev/null
+++ b/s2rest_play/app/controllers/TestController.scala
@@ -0,0 +1,24 @@
+package controllers
+
+import play.api.mvc.{Action, Controller}
+import util.TestDataLoader
+
+import scala.concurrent.Future
+
+
+object TestController extends Controller {
+  import ApplicationController._
+
+  def getRandomId() = withHeader(parse.anyContent) { request =>
+    val id = TestDataLoader.randomId
+    Ok(s"${id}")
+  }
+
+  def pingAsync() = Action.async(parse.json) { request =>
+    Future.successful(Ok("Pong\n"))
+  }
+
+  def ping() = Action(parse.json) { request =>
+    Ok("Pong\n")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/VertexController.scala 
b/s2rest_play/app/controllers/VertexController.scala
new file mode 100644
index 0000000..977c2fc
--- /dev/null
+++ b/s2rest_play/app/controllers/VertexController.scala
@@ -0,0 +1,86 @@
+package controllers
+
+
+import actors.QueueActor
+import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.utils.logger
+import com.kakao.s2graph.core.{ExceptionHandler, Graph, GraphExceptions}
+import config.Config
+import play.api.libs.json.{JsValue, Json}
+import play.api.mvc.{Controller, Result}
+
+import scala.concurrent.Future
+
+object VertexController extends Controller {
+  private val s2: Graph = com.kakao.s2graph.rest.Global.s2graph
+  private val requestParser: RequestParser = 
com.kakao.s2graph.rest.Global.s2parser
+
+  import ExceptionHandler._
+  import controllers.ApplicationController._
+  import play.api.libs.concurrent.Execution.Implicits._
+
+  def tryMutates(jsValue: JsValue, operation: String, serviceNameOpt: 
Option[String] = None, columnNameOpt: Option[String] = None, withWait: Boolean 
= false): Future[Result] = {
+    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
+    else {
+      try {
+        val vertices = requestParser.toVertices(jsValue, operation, 
serviceNameOpt, columnNameOpt)
+
+        for (vertex <- vertices) {
+          if (vertex.isAsync)
+            
ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, vertex, 
None))
+          else
+            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, 
vertex, None))
+        }
+
+        //FIXME:
+        val verticesToStore = vertices.filterNot(v => v.isAsync)
+
+        if (withWait) {
+          val rets = s2.mutateVertices(verticesToStore, withWait = true)
+          rets.map(Json.toJson(_)).map(jsonResponse(_))
+        } else {
+          val rets = verticesToStore.map { vertex => QueueActor.router ! 
vertex; true }
+          Future.successful(jsonResponse(Json.toJson(rets)))
+        }
+      } catch {
+        case e: GraphExceptions.JsonParseException => 
Future.successful(BadRequest(s"e"))
+        case e: Exception =>
+          logger.error(s"[Failed] tryMutates", e)
+          Future.successful(InternalServerError(s"${e.getStackTrace}"))
+      }
+    }
+  }
+
+  def inserts() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert")
+  }
+
+  def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert", withWait = true)
+  }
+
+  def insertsSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "insert", Some(serviceName), Some(columnName))
+  }
+
+  def deletes() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete")
+  }
+
+  def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete", withWait = true)
+  }
+
+  def deletesSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "delete", Some(serviceName), Some(columnName))
+  }
+
+  def deletesAll() = withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "deleteAll")
+  }
+
+  def deletesAllSimple(serviceName: String, columnName: String) = 
withHeaderAsync(jsonParser) { request =>
+    tryMutates(request.body, "deleteAll", Some(serviceName), Some(columnName))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/models/ExactCounterItem.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/models/ExactCounterItem.scala 
b/s2rest_play/app/models/ExactCounterItem.scala
new file mode 100644
index 0000000..244c046
--- /dev/null
+++ b/s2rest_play/app/models/ExactCounterItem.scala
@@ -0,0 +1,38 @@
+package models
+
+import play.api.libs.json.{Json, Writes}
+
+/**
+ * Created by alec on 15. 4. 15..
+ */
+case class ExactCounterItem(ts: Long, count: Long, score: Double)
+
+case class ExactCounterIntervalItem(interval: String, dimension: Map[String, 
String], counter: Seq[ExactCounterItem])
+
+case class ExactCounterResultMeta(service: String, action: String, item: 
String)
+
+case class ExactCounterResult(meta: ExactCounterResultMeta, data: 
Seq[ExactCounterIntervalItem])
+
+object ExactCounterItem {
+  implicit val writes = new Writes[ExactCounterItem] {
+    def writes(item: ExactCounterItem) = Json.obj(
+      "ts" -> item.ts,
+      "time" -> tsFormat.format(item.ts),
+      "count" -> item.count,
+      "score" -> item.score
+    )
+  }
+  implicit val reads = Json.reads[ExactCounterItem]
+}
+
+object ExactCounterIntervalItem {
+  implicit val format = Json.format[ExactCounterIntervalItem]
+}
+
+object ExactCounterResultMeta {
+  implicit val format = Json.format[ExactCounterResultMeta]
+}
+
+object ExactCounterResult {
+  implicit val formats = Json.format[ExactCounterResult]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/models/RankCounterItem.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/models/RankCounterItem.scala 
b/s2rest_play/app/models/RankCounterItem.scala
new file mode 100644
index 0000000..aaa7df7
--- /dev/null
+++ b/s2rest_play/app/models/RankCounterItem.scala
@@ -0,0 +1,40 @@
+package models
+
+import play.api.libs.json.{Json, Writes}
+
+/**
+ * Created by alec on 15. 4. 15..
+ */
+case class RankCounterItem(rank: Int, id: String, score: Double)
+
+case class RankCounterDimensionItem(interval: String, ts: Long, dimension: 
String, total: Double, ranks: Seq[RankCounterItem])
+
+case class RankCounterResultMeta(service: String, action: String)
+
+case class RankCounterResult(meta: RankCounterResultMeta, data: 
Seq[RankCounterDimensionItem])
+
+object RankCounterItem {
+  implicit val format = Json.format[RankCounterItem]
+}
+
+object RankCounterDimensionItem {
+  implicit val writes = new Writes[RankCounterDimensionItem] {
+    def writes(item: RankCounterDimensionItem) = Json.obj(
+      "interval" -> item.interval,
+      "ts" -> item.ts,
+      "time" -> tsFormat.format(item.ts),
+      "dimension" -> item.dimension,
+      "total" -> item.total,
+      "ranks" -> item.ranks
+    )
+  }
+  implicit val reads = Json.reads[RankCounterDimensionItem]
+}
+
+object RankCounterResultMeta {
+  implicit val format = Json.format[RankCounterResultMeta]
+}
+
+object RankCounterResult {
+  implicit val format = Json.format[RankCounterResult]
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/models/package.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/models/package.scala 
b/s2rest_play/app/models/package.scala
new file mode 100644
index 0000000..17fa8e1
--- /dev/null
+++ b/s2rest_play/app/models/package.scala
@@ -0,0 +1,8 @@
+import java.text.SimpleDateFormat
+
+/**
+ * Created by alec on 15. 4. 20..
+ */
+package object models {
+  def tsFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/app/util/TestDataLoader.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/util/TestDataLoader.scala 
b/s2rest_play/app/util/TestDataLoader.scala
new file mode 100644
index 0000000..45a9b61
--- /dev/null
+++ b/s2rest_play/app/util/TestDataLoader.scala
@@ -0,0 +1,70 @@
+package util
+
+import java.io.File
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer}
+import scala.io.Source
+import scala.util.Random
+
+
+object TestDataLoader {
+  val step = 100
+  val prob = 1.0
+  val (testIds, testIdsHist, testIdsHistCnt) = loadSeeds("./talk_vertices.txt")
+  val maxId = testIds.length
+  //  val randoms = (0 until 100).map{ i => new SecureRandom }
+  //  val idx = new AtomicInteger(0)
+  //  def randomId() = {
+  //    val r = randoms(idx.getAndIncrement() % randoms.size)
+  //    testAccountIds(r.nextInt(maxId))
+  //  }
+  def randomId(histStep: Int) = {
+    for {
+      maxId <- testIdsHistCnt.get(histStep)
+      rIdx = Random.nextInt(maxId.toInt)
+      hist <- testIdsHist.get(histStep)
+      id = hist(rIdx)
+    } yield {
+//      logger.debug(s"randomId: $histStep = $id[$rIdx / $maxId]")
+      id
+    }
+  }
+  def randomId() = {
+    val id = testIds(Random.nextInt(maxId))
+    //    logger.debug(s"$id")
+    id
+  }
+  private def loadSeeds(filePath: String) = {
+    val histogram = new HashMap[Long, ListBuffer[Long]]
+    val histogramCnt = new HashMap[Long, Long]
+    val ids = new ArrayBuffer[Long]
+
+    var idx = 0
+//    logger.debug(s"$filePath start to load file.")
+    for (line <- Source.fromFile(new File(filePath)).getLines) {
+      //      testAccountIds(idx) = line.toLong
+//      if (idx % 10000 == 0) logger.debug(s"$idx")
+      idx += 1
+
+      val parts = line.split("\\t")
+      val id = parts.head.toLong
+      val count = parts.last.toLong / step
+      if (count > 1 && Random.nextDouble < prob) {
+        histogram.get(count) match {
+          case None =>
+            histogram.put(count, new ListBuffer[Long])
+            histogram.get(count).get += id
+            histogramCnt.put(count, 1)
+          case Some(existed) =>
+            existed += id
+            histogramCnt.put(count, histogramCnt.getOrElse(count, 0L) + 1L)
+        }
+        ids += id
+      }
+
+    }
+//    logger.debug(s"upload $filePath finished.")
+//    logger.debug(s"${histogram.size}")
+    (ids, histogram.map(t => (t._1 -> t._2.toArray[Long])), histogramCnt)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/build.sbt
----------------------------------------------------------------------
diff --git a/s2rest_play/build.sbt b/s2rest_play/build.sbt
new file mode 100755
index 0000000..6e0f0bd
--- /dev/null
+++ b/s2rest_play/build.sbt
@@ -0,0 +1,12 @@
+name := "s2rest_play"
+
+scalacOptions in Test ++= Seq("-Yrangepos")
+
+libraryDependencies ++= Seq(
+  ws,
+  filters,
+  "com.github.danielwegener" % "logback-kafka-appender" % "0.0.3"
+)
+
+enablePlugins(JavaServerAppPackaging)
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2rest_play/conf/application.conf
----------------------------------------------------------------------
diff --git a/s2rest_play/conf/application.conf 
b/s2rest_play/conf/application.conf
new file mode 100644
index 0000000..e69de29


Reply via email to