http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala
index 217b6eb..33ec7d9 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala
@@ -3,9 +3,6 @@ package com.kakao.s2graph.core
 import com.kakao.s2graph.core.types.InnerValLike
 import play.api.libs.json.{JsNumber, JsString, JsValue}
 
-/**
- * Created by hsleep([email protected]) on 2015. 11. 5..
- */
 object OrderingUtil {
 
   implicit object JsValueOrdering extends Ordering[JsValue] {
@@ -38,9 +35,54 @@ object OrderingUtil {
       }
     }
   }
+
+  def TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: 
Ordering[T]): Ordering[(T, T, T, T)] = {
+    new Ordering[(T, T, T, T)] {
+      override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
+        val len = ascendingLs.length
+        val it = ascendingLs.iterator
+        if (len >= 1) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare1 = ord.compare(x._1, y._1)
+          if (compare1 != 0) return compare1
+        }
+
+        if (len >= 2) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare2 = ord.compare(x._2, y._2)
+          if (compare2 != 0) return compare2
+        }
+
+        if (len >= 3) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare3 = ord.compare(x._3, y._3)
+          if (compare3 != 0) return compare3
+        }
+
+        if (len >= 4) {
+          val (x, y) = it.next() match {
+            case true => tx -> ty
+            case false => ty -> tx
+          }
+          val compare4 = ord.compare(x._4, y._4)
+          if (compare4 != 0) return compare4
+        }
+        0
+      }
+    }
+  }
 }
 
-class SeqMultiOrdering[T: Ordering](ascendingLs: Seq[Boolean], 
defaultAscending: Boolean = true)(implicit ord: Ordering[T]) extends 
Ordering[Seq[T]] {
+class SeqMultiOrdering[T](ascendingLs: Seq[Boolean], defaultAscending: Boolean 
= true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] {
   override def compare(x: Seq[T], y: Seq[T]): Int = {
     val xe = x.iterator
     val ye = y.iterator
@@ -60,45 +102,45 @@ class SeqMultiOrdering[T: Ordering](ascendingLs: 
Seq[Boolean], defaultAscending:
   }
 }
 
-class TupleMultiOrdering[T: Ordering](ascendingLs: Seq[Boolean])(implicit ord: 
Ordering[T]) extends Ordering[(T, T, T, T)] {
-  override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
-    val len = ascendingLs.length
-    val it = ascendingLs.iterator
-    if (len >= 1) {
-      val (x, y) = it.next() match {
-        case true => tx -> ty
-        case false => ty -> tx
-      }
-      val compare1 = ord.compare(x._1, y._1)
-      if (compare1 != 0) return compare1
-    }
-
-    if (len >= 2) {
-      val (x, y) = it.next() match {
-        case true => tx -> ty
-        case false => ty -> tx
-      }
-      val compare2 = ord.compare(x._2, y._2)
-      if (compare2 != 0) return compare2
-    }
-
-    if (len >= 3) {
-      val (x, y) = it.next() match {
-        case true => tx -> ty
-        case false => ty -> tx
-      }
-      val compare3 = ord.compare(x._3, y._3)
-      if (compare3 != 0) return compare3
-    }
-
-    if (len >= 4) {
-      val (x, y) = it.next() match {
-        case true => tx -> ty
-        case false => ty -> tx
-      }
-      val compare4 = ord.compare(x._4, y._4)
-      if (compare4 != 0) return compare4
-    }
-    0
-  }
-}
+//class TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: 
Ordering[T]) extends Ordering[(T, T, T, T)] {
+//  override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = {
+//    val len = ascendingLs.length
+//    val it = ascendingLs.iterator
+//    if (len >= 1) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare1 = ord.compare(x._1, y._1)
+//      if (compare1 != 0) return compare1
+//    }
+//
+//    if (len >= 2) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare2 = ord.compare(x._2, y._2)
+//      if (compare2 != 0) return compare2
+//    }
+//
+//    if (len >= 3) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare3 = ord.compare(x._3, y._3)
+//      if (compare3 != 0) return compare3
+//    }
+//
+//    if (len >= 4) {
+//      val (x, y) = it.next() match {
+//        case true => tx -> ty
+//        case false => ty -> tx
+//      }
+//      val compare4 = ord.compare(x._4, y._4)
+//      if (compare4 != 0) return compare4
+//    }
+//    0
+//  }
+//}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
new file mode 100644
index 0000000..0a26d26
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
@@ -0,0 +1,439 @@
+package com.kakao.s2graph.core
+
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.types.{InnerVal, InnerValLike}
+import play.api.libs.json.{Json, _}
+
+import scala.collection.mutable.ListBuffer
+
+object PostProcess extends JSONParser {
+  /**
+    * Result Entity score field name
+    */
+  val timeoutResults = Json.obj("size" -> 0, "results" -> Json.arr(), 
"isTimeout" -> true)
+  val emptyResults = Json.obj("size" -> 0, "results" -> Json.arr(), "isEmpty" 
-> true)
+  def badRequestResults(ex: => Exception) = ex match {
+    case ex: BadQueryException => Json.obj("message" -> ex.msg)
+    case _ => Json.obj("message" -> ex.getMessage)
+  }
+
+  val SCORE_FIELD_NAME = "scoreSum"
+  val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", 
"_timestamp", "timestamp", "score", "props")
+
+  def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
exclude: Seq[QueryRequestWithResult]) = {
+    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
+    //    filterNot {case (edge, score) => 
edge.props.contains(LabelMeta.degreeSeq)}
+    val groupedEdgesWithRank = (for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, 
queryRequest.query.filterOutFields))
+    } yield {
+      (queryRequest.queryParam, edge, score)
+    }).groupBy {
+      case (queryParam, edge, rank) if edge.labelWithDir.dir == 
GraphUtil.directions("in") =>
+        (queryParam.label.srcColumn, queryParam.label.label, 
queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree)
+      case (queryParam, edge, rank) =>
+        (queryParam.label.tgtColumn, queryParam.label.label, 
queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree)
+    }
+
+    val ret = for {
+      ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) 
<- groupedEdgesWithRank if !isDegreeEdge
+      edgesWithRanks = edgesAndRanks.groupBy(x => 
x._2.srcVertex).map(_._2.head)
+      id <- innerValToJsValue(target, tgtColumn.columnType)
+    } yield {
+      Json.obj("name" -> tgtColumn.columnName, "id" -> id,
+        SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum,
+        "label" -> labelName,
+        "aggr" -> Json.obj(
+          "name" -> srcColumn.columnName,
+          "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) =>
+            innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)
+          },
+          "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) =>
+            Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, 
srcColumn.columnType),
+              "props" -> propsToJson(edge),
+              "score" -> rank
+            )
+          }
+        )
+      )
+    }
+
+    ret.toList
+  }
+
+  def sortWithFormatted(jsons: Seq[JsObject],
+                        scoreField: String = "scoreSum",
+                        queryRequestWithResultLs: Seq[QueryRequestWithResult],
+                        decrease: Boolean = true): JsObject = {
+    val ordering = if (decrease) -1 else 1
+    val sortedJsons = jsons.sortBy { jsObject => (jsObject \ 
scoreField).as[Double] * ordering }
+
+    if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, 
"results" -> sortedJsons)
+    else Json.obj(
+      "size" -> sortedJsons.size,
+      "results" -> sortedJsons,
+      "impressionId" -> 
queryRequestWithResultLs.head.queryRequest.query.impressionId()
+    )
+  }
+
+  private def toHashKey(edge: Edge, queryParam: QueryParam, fields: 
Seq[String], delimiter: String = ","): Int = {
+    val ls = for {
+      field <- fields
+    } yield {
+      field match {
+        case "from" | "_from" => edge.srcVertex.innerId
+        case "to" | "_to" => edge.tgtVertex.innerId
+        case "label" => edge.labelWithDir.labelId
+        case "direction" => 
JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
+        case "_timestamp" | "timestamp" => edge.ts
+        case _ =>
+          queryParam.label.metaPropsInvMap.get(field) match {
+            case None => throw new RuntimeException(s"unknow column: $field")
+            case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match {
+              case None => labelMeta.defaultValue
+              case Some(propVal) => propVal
+            }
+          }
+      }
+    }
+    val ret = ls.hashCode()
+    ret
+  }
+
+  def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
isSrcVertex: Boolean = false): Seq[Int] = {
+    for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      q = queryRequest.query
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+    } yield toHashKey(edge, queryRequest.queryParam, q.filterOutFields)
+  }
+
+  def summarizeWithListExcludeFormatted(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
+    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
+    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs, decrease = true)
+  }
+
+  def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
exclude: Seq[QueryRequestWithResult]) = {
+    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
+    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs)
+  }
+
+  def summarizeWithListFormatted(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
+    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
+    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs)
+  }
+
+  def toSimpleVertexArrJson(queryRequestWithResultLs: 
Seq[QueryRequestWithResult]): JsValue = {
+    toSimpleVertexArrJson(queryRequestWithResultLs, 
Seq.empty[QueryRequestWithResult])
+  }
+
+  private def orderBy(q: Query,
+                      orderByColumns: Seq[(String, Boolean)],
+                      rawEdges: ListBuffer[(Map[String, JsValue], Double, 
(Any, Any, Any, Any))])
+  : ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, Any))] = {
+    import com.kakao.s2graph.core.OrderingUtil._
+
+    if (q.withScore && orderByColumns.nonEmpty) {
+      val ascendingLs = orderByColumns.map(_._2)
+      rawEdges.sortBy(_._3)(OrderingUtil.TupleMultiOrdering[Any](ascendingLs))
+    } else {
+      rawEdges
+    }
+  }
+
+  private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, 
edge: Edge, column: String): Any = {
+    column match {
+      case "score" => score
+      case "timestamp" | "_timestamp" => edge.ts
+      case _ =>
+        keyWithJs.get(column) match {
+          case None => keyWithJs.get("props").map { js => (js \ 
column).as[JsValue] }.get
+          case Some(x) => x
+        }
+    }
+  }
+
+  def toSimpleVertexArrJson(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsValue = {
+    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
+
+    val degrees = ListBuffer[JsValue]()
+    val rawEdges = ListBuffer[(Map[String, JsValue], Double, (Any, Any, Any, 
Any))]()
+
+    if (queryRequestWithResultLs.isEmpty) {
+      Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" -> Json.arr())
+    } else {
+      val (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get
+      val query = queryRequest.query
+      val queryParam = queryRequest.queryParam
+
+      val orderByColumns = query.orderByColumns.filter { case (column, _) =>
+        column match {
+          case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" 
=> true
+          case _ =>
+            queryParam.label.metaPropNames.contains(column)
+        }
+      }
+
+      /** build result jsons */
+      for {
+        queryRequestWithResult <- queryRequestWithResultLs
+        (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+        queryParam = queryRequest.queryParam
+        edgeWithScore <- queryResult.edgeWithScoreLs
+        (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+        if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, 
query.filterOutFields))
+      } {
+        // edge to json
+        val (srcColumn, _) = 
queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
+        val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, 
srcColumn.columnType)
+        if (edge.isDegree && fromOpt.isDefined) {
+          degrees += Json.obj(
+            "from" -> fromOpt.get,
+            "label" -> queryRequest.queryParam.label.label,
+            "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir),
+            LabelMeta.degree.name -> 
innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG)
+          )
+        } else {
+          val keyWithJs = edgeToJson(edge, score, queryRequest.query, 
queryRequest.queryParam)
+          val orderByValues: (Any, Any, Any, Any) = orderByColumns.length 
match {
+            case 0 =>
+              (None, None, None, None)
+            case 1 =>
+              val it = orderByColumns.iterator
+              val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              (v1, None, None, None)
+            case 2 =>
+              val it = orderByColumns.iterator
+              val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              (v1, v2, None, None)
+            case 3 =>
+              val it = orderByColumns.iterator
+              val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              (v1, v2, v3, None)
+            case _ =>
+              val it = orderByColumns.iterator
+              val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+              (v1, v2, v3, v4)
+          }
+
+          val currentEdge = (keyWithJs, score, orderByValues)
+          rawEdges += currentEdge
+        }
+      }
+
+      if (query.groupByColumns.isEmpty) {
+        // ordering
+        val edges = orderBy(query, orderByColumns, rawEdges).map(_._1)
+
+        Json.obj(
+          "size" -> edges.size,
+          "degrees" -> degrees,
+          "results" -> edges,
+          "impressionId" -> query.impressionId()
+        )
+      } else {
+        val grouped = rawEdges.groupBy { case (keyWithJs, _, _) =>
+          val props = keyWithJs.get("props")
+
+          for {
+            column <- query.groupByColumns
+            value <- keyWithJs.get(column) match {
+              case None => props.flatMap { js => (js \ column).asOpt[JsValue] }
+              case Some(x) => Some(x)
+            }
+          } yield column -> value
+        }
+
+        val groupedEdges = {
+          for {
+            (groupByKeyVals, groupedRawEdges) <- grouped
+          } yield {
+            val scoreSum = groupedRawEdges.map(x => x._2).sum
+            // ordering
+            val edges = orderBy(query, orderByColumns, 
groupedRawEdges).map(_._1)
+            Json.obj(
+              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+              "scoreSum" -> scoreSum,
+              "agg" -> edges
+            )
+          }
+        }
+
+        val groupedSortedJsons = groupedEdges.toList.sortBy { jsVal => -1 * 
(jsVal \ "scoreSum").as[Double] }
+        Json.obj(
+          "size" -> groupedEdges.size,
+          "degrees" -> degrees,
+          "results" -> groupedSortedJsons,
+          "impressionId" -> query.impressionId()
+        )
+      }
+    }
+  }
+
+  def verticesToJson(vertices: Iterable[Vertex]) = {
+    Json.toJson(vertices.flatMap { v => vertexToJson(v) })
+  }
+
+  def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, 
JsValue] = {
+    for {
+      (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
+      labelMeta <- queryParam.label.metaPropsMap.get(seq)
+      jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType)
+    } yield labelMeta.name -> jsValue
+  }
+
+  private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, 
queryParam: QueryParam): JsValue = {
+    if (parentEdges.isEmpty) {
+      JsNull
+    } else {
+      val parents = for {
+        parent <- parentEdges
+        (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get
+        parentQueryParam = QueryParam(parentEdge.labelWithDir)
+        parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if 
parents != JsNull
+      } yield {
+        val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge)
+        val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, 
parentQueryParam) + ("parents" -> parents)
+        Json.toJson(edgeJson)
+      }
+
+      Json.toJson(parents)
+    }
+  }
+
+  /** TODO */
+  def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: 
QueryParam): Map[String, JsValue] = {
+    val (srcColumn, tgtColumn) = 
queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
+
+    val kvMapOpt = for {
+      from <- innerValToJsValue(edge.srcVertex.id.innerId, 
srcColumn.columnType)
+      to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
+    } yield {
+      val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else 
(reservedColumns & q.selectColumnsSet) + "props"
+
+      val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ 
propsToJson(edge, q, queryParam)
+      val propsMap = if (q.selectColumnsSet.nonEmpty) 
_propsMap.filterKeys(q.selectColumnsSet) else _propsMap
+
+      val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, 
column) =>
+        val jsValue = column match {
+          case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - 
(System.currentTimeMillis() - queryParam.timestamp))
+          case "from" => from
+          case "to" => to
+          case "label" => JsString(queryParam.label.label)
+          case "direction" => 
JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
+          case "_timestamp" | "timestamp" => JsNumber(edge.ts)
+          case "score" => JsNumber(score)
+          case "props" if propsMap.nonEmpty => Json.toJson(propsMap)
+          case _ => JsNull
+        }
+
+        if (jsValue == JsNull) map else map + (column -> jsValue)
+      }
+      kvMap
+    }
+
+    kvMapOpt.getOrElse(Map.empty)
+  }
+
+  def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): 
Map[String, JsValue] = {
+    val kvs = edgeToJsonInner(edge, score, q, queryParam)
+    if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> 
Json.toJson(edgeParent(edge.parentEdges, q, queryParam)))
+    else kvs
+  }
+
+  def vertexToJson(vertex: Vertex): Option[JsObject] = {
+    val serviceColumn = ServiceColumn.findById(vertex.id.colId)
+
+    for {
+      id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType)
+    } yield {
+      Json.obj("serviceName" -> serviceColumn.service.serviceName,
+        "columnName" -> serviceColumn.columnName,
+        "id" -> id, "props" -> propsToJson(vertex),
+        "timestamp" -> vertex.ts,
+        //        "belongsTo" -> vertex.belongLabelIds)
+        "belongsTo" -> 
vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label)))
+    }
+  }
+
+  private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, 
InnerValLike]) = {
+    for {
+      (seq, value) <- props
+      name <- seqsToNames.get(seq)
+    } yield (name, value)
+  }
+
+  private def propsToJson(vertex: Vertex) = {
+    val serviceColumn = vertex.serviceColumn
+    val props = for {
+      (propKey, innerVal) <- vertex.props
+      columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, 
propKey.toByte, useCache = true)
+      jsValue <- innerValToJsValue(innerVal, columnMeta.dataType)
+    } yield {
+      (columnMeta.name -> jsValue)
+    }
+    props.toMap
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def propsToJson(edge: Edge) = {
+    for {
+      (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
+      metaProp <- edge.label.metaPropsMap.get(seq)
+      jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType)
+    } yield {
+      (metaProp.name, jsValue)
+    }
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def summarizeWithListExclude(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
+    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
+
+
+    val groupedEdgesWithRank = (for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, 
queryRequest.query.filterOutFields))
+    } yield {
+      (edge, score)
+    }).groupBy { case (edge, score) =>
+      (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId)
+    }
+
+    val jsons = for {
+      ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank
+      (edges, ranks) = edgesAndRanks.groupBy(x => 
x._1.srcVertex).map(_._2.head).unzip
+      tgtId <- innerValToJsValue(target, tgtColumn.columnType)
+    } yield {
+      Json.obj(tgtColumn.columnName -> tgtId,
+        s"${srcColumn.columnName}s" ->
+          edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, 
srcColumn.columnType)), "scoreSum" -> ranks.sum)
+    }
+    val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ 
"scoreSum").as[Double] }.reverse
+    if (queryRequestWithResultLs.isEmpty) {
+      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
+    } else {
+      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons,
+        "impressionId" -> 
queryRequestWithResultLs.head.queryRequest.query.impressionId())
+    }
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala
index f40afab..d41e9fa 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala
@@ -54,6 +54,19 @@ object Model {
       res
     }
   }
+
+  def shutdown() = {
+    ConnectionPool.closeAll()
+  }
+
+  def loadCache() = {
+    Service.findAll()
+    ServiceColumn.findAll()
+    Label.findAll()
+    LabelMeta.findAll()
+    LabelIndex.findAll()
+    ColumnMeta.findAll()
+  }
 }
 
 trait Model[V] extends SQLSyntaxSupport[V] {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
new file mode 100644
index 0000000..98cc68b
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
@@ -0,0 +1,509 @@
+package com.kakao.s2graph.core.rest
+
+import com.kakao.s2graph.core.GraphExceptions.{BadQueryException, 
ModelNotFoundException}
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.parsers.WhereParser
+import com.kakao.s2graph.core.types._
+import com.typesafe.config.Config
+import play.api.libs.json._
+
+import scala.util.{Failure, Success, Try}
+
+class RequestParser(config: Config) extends JSONParser {
+
+  import Management.JsonModel._
+
+  val hardLimit = 100000
+  val defaultLimit = 100
+  val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
+  val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
+  val DefaultCluster = config.getString("hbase.zookeeper.quorum")
+  val DefaultCompressionAlgorithm = 
config.getString("hbase.table.compression.algorithm")
+  val DefaultPhase = config.getString("phase")
+
+  private def extractScoring(labelId: Int, value: JsValue) = {
+    val ret = for {
+      js <- parse[Option[JsObject]](value, "scoring")
+    } yield {
+      for {
+        (k, v) <- js.fields
+        labelOrderType <- LabelMeta.findByName(labelId, k)
+      } yield {
+        val value = v match {
+          case n: JsNumber => n.as[Double]
+          case _ => throw new Exception("scoring weight should be double.")
+        }
+        (labelOrderType.seq, value)
+      }
+    }
+    ret
+  }
+
+  def extractInterval(label: Label, jsValue: JsValue) = {
+    def extractKv(js: JsValue) = js match {
+      case JsObject(obj) => obj
+      case JsArray(arr) => arr.flatMap {
+        case JsObject(obj) => obj
+        case _ => throw new RuntimeException(s"cannot support json type $js")
+      }
+      case _ => throw new RuntimeException(s"cannot support json type: $js")
+    }
+
+    val ret = for {
+      js <- parse[Option[JsObject]](jsValue, "interval")
+      fromJs <- (js \ "from").asOpt[JsValue]
+      toJs <- (js \ "to").asOpt[JsValue]
+    } yield {
+      val from = Management.toProps(label, extractKv(fromJs))
+      val to = Management.toProps(label, extractKv(toJs))
+      (from, to)
+    }
+
+    ret
+  }
+
+  def extractDuration(label: Label, jsValue: JsValue) = {
+    for {
+      js <- parse[Option[JsObject]](jsValue, "duration")
+    } yield {
+      val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue)
+      val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue)
+
+      (minTs, maxTs)
+    }
+  }
+
+  def extractHas(label: Label, jsValue: JsValue) = {
+    val ret = for {
+      js <- parse[Option[JsObject]](jsValue, "has")
+    } yield {
+      for {
+        (k, v) <- js.fields
+        labelMeta <- LabelMeta.findByName(label.id.get, k)
+        value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion)
+      } yield {
+        labelMeta.seq -> value
+      }
+    }
+    ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
+  }
+
+  def extractWhere(labelMap: Map[String, Label], jsValue: JsValue) = {
+    (jsValue \ "where").asOpt[String] match {
+      case None => Success(WhereParser.success)
+      case Some(where) =>
+        WhereParser(labelMap).parse(where) match {
+          case s@Success(_) => s
+          case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
+        }
+    }
+  }
+
+  def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): 
Seq[Vertex] = {
+    val vertices = for {
+      label <- Label.findByName(labelName).toSeq
+      serviceColumn = if (direction == "out") label.srcColumn else 
label.tgtColumn
+      id <- ids
+      innerId <- jsValueToInnerVal(id, serviceColumn.columnType, 
label.schemaVersion)
+    } yield {
+      Vertex(SourceVertexId(serviceColumn.id.get, innerId), 
System.currentTimeMillis())
+    }
+    vertices.toSeq
+  }
+
+  def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
+    try {
+      val vertices =
+        (for {
+          value <- parse[List[JsValue]](jsValue, "srcVertices")
+          serviceName = parse[String](value, "serviceName")
+          column = parse[String](value, "columnName")
+        } yield {
+          val service = Service.findByName(serviceName).getOrElse(throw 
BadQueryException("service not found"))
+          val col = ServiceColumn.find(service.id.get, column).getOrElse(throw 
BadQueryException("bad column name"))
+          val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ 
"ids").asOpt[List[JsValue]])
+          for {
+            idVal <- idOpt ++ idsOpt.toSeq.flatten
+
+            /** bug, need to use labels schemaVersion  */
+            innerVal <- jsValueToInnerVal(idVal, col.columnType, 
col.schemaVersion)
+          } yield {
+            Vertex(SourceVertexId(col.id.get, innerVal), 
System.currentTimeMillis())
+          }
+        }).flatten
+
+      if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is 
empty")
+
+      val filterOutFields = (jsValue \ 
"filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
+      val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => 
toQuery(v) }.map { q => q.copy(filterOutFields = filterOutFields) }
+      val steps = parse[Vector[JsValue]](jsValue, "steps")
+      val removeCycle = (jsValue \ 
"removeCycle").asOpt[Boolean].getOrElse(true)
+      val selectColumns = (jsValue \ 
"select").asOpt[List[String]].getOrElse(List.empty)
+      val groupByColumns = (jsValue \ 
"groupBy").asOpt[List[String]].getOrElse(List.empty)
+      val orderByColumns: List[(String, Boolean)] = (jsValue \ 
"orderBy").asOpt[List[JsObject]].map { jsLs =>
+        for {
+          js <- jsLs
+          (column, orderJs) <- js.fields
+        } yield {
+          val ascending = orderJs.as[String].toUpperCase match {
+            case "ASC" => true
+            case "DESC" => false
+          }
+          column -> ascending
+        }
+      }.getOrElse(List("score" -> false, "timestamp" -> false))
+      val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true)
+      val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false)
+
+      // TODO: throw exception, when label dosn't exist
+      val labelMap = (for {
+        js <- jsValue \\ "label"
+        labelName <- js.asOpt[String]
+        label <- Label.findByName(labelName)
+      } yield (labelName, label)).toMap
+
+      val querySteps =
+        steps.zipWithIndex.map { case (step, stepIdx) =>
+          val labelWeights = step match {
+            case obj: JsObject =>
+              val converted = for {
+                (k, v) <- (obj \ 
"weights").asOpt[JsObject].getOrElse(Json.obj()).fields
+                l <- Label.findByName(k)
+              } yield {
+                l.id.get -> v.toString().toDouble
+              }
+              converted.toMap
+            case _ => Map.empty[Int, Double]
+          }
+          val queryParamJsVals = step match {
+            case arr: JsArray => arr.as[List[JsValue]]
+            case obj: JsObject => (obj \ "step").as[List[JsValue]]
+            case _ => List.empty[JsValue]
+          }
+          val nextStepScoreThreshold = step match {
+            case obj: JsObject => (obj \ 
"nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
+            case _ => QueryParam.DefaultThreshold
+          }
+          val nextStepLimit = step match {
+            case obj: JsObject => (obj \ 
"nextStepLimit").asOpt[Int].getOrElse(-1)
+            case _ => -1
+          }
+          val cacheTTL = step match {
+            case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1)
+            case _ => -1
+          }
+          val queryParams =
+            for {
+              labelGroup <- queryParamJsVals
+              queryParam <- parseQueryParam(labelMap, labelGroup)
+            } yield {
+              val (_, columnName) =
+                if (queryParam.labelWithDir.dir == 
GraphUtil.directions("out")) {
+                  (queryParam.label.srcService.serviceName, 
queryParam.label.srcColumnName)
+                } else {
+                  (queryParam.label.tgtService.serviceName, 
queryParam.label.tgtColumnName)
+                }
+              //FIXME:
+              if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => 
v.serviceColumn.columnName == columnName)) {
+                throw BadQueryException("srcVertices contains incompatiable 
serviceName or columnName with first step.")
+              }
+
+              queryParam
+            }
+          Step(queryParams.toList, labelWeights = labelWeights,
+            //            scoreThreshold = stepThreshold,
+            nextStepScoreThreshold = nextStepScoreThreshold,
+            nextStepLimit = nextStepLimit,
+            cacheTTL = cacheTTL)
+
+        }
+
+      val ret = Query(
+        vertices,
+        querySteps,
+        removeCycle = removeCycle,
+        selectColumns = selectColumns,
+        groupByColumns = groupByColumns,
+        orderByColumns = orderByColumns,
+        filterOutQuery = filterOutQuery,
+        filterOutFields = filterOutFields,
+        withScore = withScore,
+        returnTree = returnTree
+      )
+      //      logger.debug(ret.toString)
+      ret
+    } catch {
+      case e: BadQueryException =>
+        throw e
+      case e: ModelNotFoundException =>
+        throw BadQueryException(e.getMessage, e)
+      case e: Exception =>
+        throw BadQueryException(s"$jsValue, $e", e)
+    }
+  }
+
+  private def parseQueryParam(labelMap: Map[String, Label], labelGroup: 
JsValue): Option[QueryParam] = {
+    for {
+      labelName <- parse[Option[String]](labelGroup, "label")
+    } yield {
+      val label = Label.findByName(labelName).getOrElse(throw 
BadQueryException(s"$labelName not found"))
+      val direction = parse[Option[String]](labelGroup, 
"direction").map(GraphUtil.toDirection(_)).getOrElse(0)
+      val limit = {
+        parse[Option[Int]](labelGroup, "limit") match {
+          case None => defaultLimit
+          case Some(l) if l < 0 => Int.MaxValue
+          case Some(l) if l >= 0 =>
+            val default = hardLimit
+            Math.min(l, default)
+        }
+      }
+      val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0)
+      val interval = extractInterval(label, labelGroup)
+      val duration = extractDuration(label, labelGroup)
+      val scoring = extractScoring(label.id.get, 
labelGroup).getOrElse(List.empty[(Byte, Double)]).toList
+      val exclude = parse[Option[Boolean]](labelGroup, 
"exclude").getOrElse(false)
+      val include = parse[Option[Boolean]](labelGroup, 
"include").getOrElse(false)
+      val hasFilter = extractHas(label, labelGroup)
+      val labelWithDir = LabelWithDirection(label.id.get, direction)
+      val indexNameOpt = (labelGroup \ "index").asOpt[String]
+      val indexSeq = indexNameOpt match {
+        case None => label.indexSeqsMap.get(scoring.map(kv => 
kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq)
+        case Some(indexName) => 
label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new 
RuntimeException("cannot find index"))
+      }
+      val where = extractWhere(labelMap, labelGroup)
+      val includeDegree = (labelGroup \ 
"includeDegree").asOpt[Boolean].getOrElse(true)
+      val rpcTimeout = (labelGroup \ 
"rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout)
+      val maxAttempt = (labelGroup \ 
"maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt)
+      val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { 
jsVal =>
+        jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, 
label.schemaVersion)
+      }
+      val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L)
+      val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { 
jsVal =>
+        val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0)
+        val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1)
+        if (decayRate >= 1.0 || decayRate <= 0.0) throw new 
BadQueryException("decay rate should be 0.0 ~ 1.0")
+        val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 
24.0)
+        TimeDecay(initial, decayRate, timeUnit)
+      }
+      val threshold = (labelGroup \ 
"threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
+      // TODO: refactor this. dirty
+      val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => 
Query.DuplicatePolicy(s))
+
+      val outputField = (labelGroup \ "outputField").asOpt[String].map(s => 
Json.arr(Json.arr(s)))
+      val transformer = if (outputField.isDefined) outputField else 
(labelGroup \ "transform").asOpt[JsValue]
+      val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
+      val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
+
+      // FIXME: Order of command matter
+      QueryParam(labelWithDir)
+        .sample(sample)
+        .limit(offset, limit)
+        .rank(RankParam(label.id.get, scoring))
+        .exclude(exclude)
+        .include(include)
+        .interval(interval)
+        .duration(duration)
+        .has(hasFilter)
+        .labelOrderSeq(indexSeq)
+        .interval(interval)
+        .where(where)
+        .duplicatePolicy(duplicate)
+        .includeDegree(includeDegree)
+        .rpcTimeout(rpcTimeout)
+        .maxAttempt(maxAttempt)
+        .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt)
+        .cacheTTLInMillis(cacheTTL)
+        .timeDecay(timeDecayFactor)
+        .threshold(threshold)
+        .transformer(transformer)
+        .scorePropagateOp(scorePropagateOp)
+    }
+  }
+
+  private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = 
{
+    (js \ key).validate[R]
+      .fold(
+        errors => {
+          val msg = (JsError.toFlatJson(errors) \ 
"obj").as[List[JsValue]].map(x => x \ "msg")
+          val e = Json.obj("args" -> key, "error" -> msg)
+          throw new GraphExceptions.JsonParseException(Json.obj("error" -> 
key).toString)
+        },
+        r => {
+          r
+        })
+  }
+
+  def toJsValues(jsValue: JsValue): List[JsValue] = {
+    jsValue match {
+      case obj: JsObject => List(obj)
+      case arr: JsArray => arr.as[List[JsValue]]
+      case _ => List.empty[JsValue]
+    }
+
+  }
+
+  def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
+    toJsValues(jsValue).map(toEdge(_, operation))
+  }
+
+  def toEdge(jsValue: JsValue, operation: String) = {
+
+    val srcId = parse[JsValue](jsValue, "from") match {
+      case s: JsString => s.as[String]
+      case o@_ => s"${o}"
+    }
+    val tgtId = parse[JsValue](jsValue, "to") match {
+      case s: JsString => s.as[String]
+      case o@_ => s"${o}"
+    }
+    val label = parse[String](jsValue, "label")
+    val timestamp = parse[Long](jsValue, "timestamp")
+    val direction = parse[Option[String]](jsValue, "direction").getOrElse("")
+    val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
+    Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, 
props.toString)
+
+  }
+
+  def toVertices(jsValue: JsValue, operation: String, serviceName: 
Option[String] = None, columnName: Option[String] = None) = {
+    toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName))
+  }
+
+  def toVertex(jsValue: JsValue, operation: String, serviceName: 
Option[String] = None, columnName: Option[String] = None): Vertex = {
+    val id = parse[JsValue](jsValue, "id")
+    val ts = parse[Option[Long]](jsValue, 
"timestamp").getOrElse(System.currentTimeMillis())
+    val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") 
else serviceName.get
+    val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") 
else columnName.get
+    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
+    Management.toVertex(ts, operation, id.toString, sName, cName, 
props.toString)
+  }
+
+  def toPropElements(jsObj: JsValue) = Try {
+    val propName = (jsObj \ "name").as[String]
+    val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
+    val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
+      case JsString(s) => s
+      case _@js => js.toString
+    }
+    Prop(propName, defaultValue, dataType)
+  }
+
+  def toPropsElements(jsValue: JsValue): Seq[Prop] = for {
+    jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil)
+  } yield {
+    val propName = (jsObj \ "name").as[String]
+    val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
+    val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
+      case JsString(s) => s
+      case _@js => js.toString
+    }
+    Prop(propName, defaultValue, dataType)
+  }
+
+  def toIndicesElements(jsValue: JsValue): Seq[Index] = for {
+    jsObj <- jsValue.as[Seq[JsValue]]
+    indexName = (jsObj \ "name").as[String]
+    propNames = (jsObj \ "propNames").as[Seq[String]]
+  } yield Index(indexName, propNames)
+
+  def toLabelElements(jsValue: JsValue) = Try {
+    val labelName = parse[String](jsValue, "label")
+    val srcServiceName = parse[String](jsValue, "srcServiceName")
+    val tgtServiceName = parse[String](jsValue, "tgtServiceName")
+    val srcColumnName = parse[String](jsValue, "srcColumnName")
+    val tgtColumnName = parse[String](jsValue, "tgtColumnName")
+    val srcColumnType = parse[String](jsValue, "srcColumnType")
+    val tgtColumnType = parse[String](jsValue, "tgtColumnType")
+    val serviceName = (jsValue \ 
"serviceName").asOpt[String].getOrElse(tgtServiceName)
+    val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true)
+
+    val allProps = toPropsElements(jsValue \ "props")
+    val indices = toIndicesElements(jsValue \ "indices")
+
+    val consistencyLevel = (jsValue \ 
"consistencyLevel").asOpt[String].getOrElse("weak")
+
+    // expect new label don`t provide hTableName
+    val hTableName = (jsValue \ "hTableName").asOpt[String]
+    val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
+    val schemaVersion = (jsValue \ 
"schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION)
+    val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false)
+    val compressionAlgorithm = (jsValue \ 
"compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
+
+    (labelName, srcServiceName, srcColumnName, srcColumnType,
+      tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
+      indices, allProps, consistencyLevel, hTableName, hTableTTL, 
schemaVersion, isAsync, compressionAlgorithm)
+  }
+
+  def toIndexElements(jsValue: JsValue) = Try {
+    val labelName = parse[String](jsValue, "label")
+    val indices = toIndicesElements(jsValue \ "indices")
+    (labelName, indices)
+  }
+
+  def toServiceElements(jsValue: JsValue) = {
+    val serviceName = parse[String](jsValue, "serviceName")
+    val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(DefaultCluster)
+    val hTableName = (jsValue \ 
"hTableName").asOpt[String].getOrElse(s"${serviceName}-${DefaultPhase}")
+    val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1)
+    val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
+    val compressionAlgorithm = (jsValue \ 
"compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
+    (serviceName, cluster, hTableName, preSplitSize, hTableTTL, 
compressionAlgorithm)
+  }
+
+  def toServiceColumnElements(jsValue: JsValue) = Try {
+    val serviceName = parse[String](jsValue, "serviceName")
+    val columnName = parse[String](jsValue, "columnName")
+    val columnType = parse[String](jsValue, "columnType")
+    val props = toPropsElements(jsValue \ "props")
+    (serviceName, columnName, columnType, props)
+  }
+
+  def toCheckEdgeParam(jsValue: JsValue) = {
+    val params = jsValue.as[List[JsValue]]
+    var isReverted = false
+    val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]()
+    val quads = for {
+      param <- params
+      labelName <- (param \ "label").asOpt[String]
+      direction <- GraphUtil.toDir((param \ 
"direction").asOpt[String].getOrElse("out"))
+      label <- Label.findByName(labelName)
+      srcId <- jsValueToInnerVal((param \ "from").as[JsValue], 
label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion)
+      tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], 
label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion)
+    } yield {
+      val labelWithDir = LabelWithDirection(label.id.get, direction)
+      labelWithDirs += labelWithDir
+      val (src, tgt, dir) = if (direction == 1) {
+        isReverted = true
+        (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)),
+          Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)), 0)
+      } else {
+        (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)),
+          Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)), 0)
+      }
+      (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
+    }
+
+    (quads, isReverted)
+  }
+
+  def toGraphElements(str: String): Seq[GraphElement] = {
+    val edgeStrs = str.split("\\n")
+
+    for {
+      edgeStr <- edgeStrs
+      str <- GraphUtil.parseString(edgeStr)
+      element <- Graph.toGraphElement(str)
+    } yield element
+  }
+
+  def toDeleteParam(json: JsValue) = {
+    val labelName = (json \ "label").as[String]
+    val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
+    val direction = (json \ "direction").asOpt[String].getOrElse("out")
+
+    val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
+    val ts = (json \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis())
+    val vertices = toVertices(labelName, direction, ids)
+    (labels, direction, ids, ts, vertices)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
new file mode 100644
index 0000000..bef1dec
--- /dev/null
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestCaller.scala
@@ -0,0 +1,183 @@
+package com.kakao.s2graph.core.rest
+
+import java.net.URL
+
+import com.kakao.s2graph.core.GraphExceptions.BadQueryException
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import com.kakao.s2graph.core.utils.logger
+import play.api.libs.json._
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
+
+/**
+  * Public API only return Future.successful or Future.failed
+  * Don't throw exception
+  */
+class RestCaller(graph: Graph)(implicit ec: ExecutionContext) {
+  val s2Parser = new RequestParser(graph.config)
+
+  /**
+    * Public APIS
+    */
+  def experiment(contentsBody: JsValue, accessToken: String, experimentName: 
String, uuid: String): Future[(JsValue, String)] = {
+    try {
+      val bucketOpt = for {
+        service <- Service.findByAccessToken(accessToken)
+        experiment <- Experiment.findBy(service.id.get, experimentName)
+        bucket <- experiment.findBucket(uuid)
+      } yield bucket
+
+      val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is 
not found"))
+      if (bucket.isGraphQuery) buildRequestInner(contentsBody, bucket, 
uuid).map(_ -> bucket.impressionId)
+      else throw new RuntimeException("not supported yet")
+    } catch {
+      case e: Exception => Future.failed(e)
+    }
+  }
+
+  def uriMatch(uri: String, jsQuery: JsValue): Future[JsValue] = {
+    try {
+      uri match {
+        case "/graphs/getEdges" => 
getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)
+        case "/graphs/getEdges/grouped" => 
getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)
+        case "/graphs/getEdgesExcluded" => 
getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)
+        case "/graphs/getEdgesExcluded/grouped" => 
getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)
+        case "/graphs/checkEdges" => checkEdges(jsQuery)
+        case "/graphs/getEdgesGrouped" => 
getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)
+        case "/graphs/getEdgesGroupedExcluded" => 
getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)
+        case "/graphs/getEdgesGroupedExcludedFormatted" => 
getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)
+        case "/graphs/getVertices" => getVertices(jsQuery)
+        case _ => throw new RuntimeException("route is not found")
+      }
+    } catch {
+      case e: Exception => Future.failed(e)
+    }
+  }
+
+  def checkEdges(jsValue: JsValue): Future[JsValue] = {
+    try {
+      val (quads, isReverted) = s2Parser.toCheckEdgeParam(jsValue)
+
+      graph.checkEdges(quads).map { case queryRequestWithResultLs =>
+        val edgeJsons = for {
+          queryRequestWithResult <- queryRequestWithResultLs
+          (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+          edgeWithScore <- queryResult.edgeWithScoreLs
+          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+          convertedEdge = if (isReverted) edge.duplicateEdge else edge
+          edgeJson = PostProcess.edgeToJson(convertedEdge, score, 
queryRequest.query, queryRequest.queryParam)
+        } yield Json.toJson(edgeJson)
+
+        Json.toJson(edgeJsons)
+      }
+    } catch {
+      case e: Exception => Future.failed(e)
+    }
+  }
+
+  /**
+    * Private APIS
+    */
+  private def eachQuery(post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
+    val filterOutQueryResultsLs = q.filterOutQuery match {
+      case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
+      case None => Future.successful(Seq.empty)
+    }
+
+    for {
+      queryResultsLs <- graph.getEdges(q)
+      filterOutResultsLs <- filterOutQueryResultsLs
+    } yield {
+      val json = post(queryResultsLs, filterOutResultsLs)
+      json
+    }
+  }
+
+  private def getEdgesAsync(jsonQuery: JsValue)
+                           (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+
+    val fetch = eachQuery(post) _
+    jsonQuery match {
+      case JsArray(arr) => 
Future.traverse(arr.map(s2Parser.toQuery(_)))(fetch).map(JsArray)
+      case obj@JsObject(_) => fetch(s2Parser.toQuery(obj))
+      case _ => throw BadQueryException("Cannot support")
+    }
+  }
+
+  private def getEdgesExcludedAsync(jsonQuery: JsValue)
+                                   (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
+    val q = s2Parser.toQuery(jsonQuery)
+    val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
+
+    val fetchFuture = graph.getEdges(q)
+    val excludeFuture = graph.getEdges(filterOutQuery)
+
+    for {
+      queryResultLs <- fetchFuture
+      exclude <- excludeFuture
+    } yield {
+      post(queryResultLs, exclude)
+    }
+  }
+
+  private def getVertices(jsValue: JsValue) = {
+    val jsonQuery = jsValue
+    val ts = System.currentTimeMillis()
+    val props = "{}"
+
+    val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
+      val serviceName = (js \ "serviceName").as[String]
+      val columnName = (js \ "columnName").as[String]
+      for (id <- (js \ 
"ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
+        Management.toVertex(ts, "insert", id.toString, serviceName, 
columnName, props)
+      }
+    }
+
+    graph.getVertices(vertices) map { vertices => 
PostProcess.verticesToJson(vertices) }
+  }
+
+
+  private def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: 
Bucket, uuid: String): JsValue = {
+    var body = bucket.requestBody.replace("#uuid", uuid)
+    for {
+      requestKeyJson <- requestKeyJsonOpt
+      jsObj <- requestKeyJson.asOpt[JsObject]
+      (key, value) <- jsObj.fieldSet
+    } {
+      val replacement = value match {
+        case JsString(s) => s
+        case _ => value.toString
+      }
+      body = body.replace(key, replacement)
+    }
+
+    Try(Json.parse(body)).recover {
+      case e: Exception =>
+        throw new BadQueryException(s"wrong or missing template parameter: 
${e.getMessage.takeWhile(_ != '\n')}")
+    } get
+  }
+
+  private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: 
String): Future[JsValue] = {
+    if (bucket.isEmpty) Future.successful(PostProcess.emptyResults)
+    else {
+      val jsonBody = makeRequestJson(Option(contentsBody), bucket, uuid)
+      val url = new URL(bucket.apiPath)
+      val path = url.getPath()
+
+      // dummy log for sampling
+      val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
+
+      logger.info(experimentLog)
+
+      uriMatch(path, jsonBody)
+    }
+  }
+
+  def calcSize(js: JsValue): Int = js match {
+    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
+    case JsArray(seq) => seq.map(js => (js \ 
"size").asOpt[Int].getOrElse(0)).sum
+    case _ => 0
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
index 824bb0d..717a5ac 100644
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
+++ 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilder.scala
@@ -90,7 +90,7 @@ class AsynchbaseQueryBuilder(storage: 
AsynchbaseStorage)(implicit ec: ExecutionC
   }
 
   val maxSize = storage.config.getInt("future.cache.max.size")
-  val futureCacheTTL = storage.config.getInt("future.cache.max.idle.ttl")
+  val futureCacheTTL = 
storage.config.getInt("future.cache.expire.after.access")
   val futureCache = CacheBuilder.newBuilder()
   .initialCapacity(maxSize)
   .concurrencyLevel(Runtime.getRuntime.availableProcessors())

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala
index 4f2ba9f..d281017 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala
@@ -1,7 +1,7 @@
 package com.kakao.s2graph.core.utils
 
-import play.api.Logger
 import play.api.libs.json.JsValue
+import org.slf4j.LoggerFactory
 
 import scala.language.{higherKinds, implicitConversions}
 
@@ -29,8 +29,8 @@ object logger {
     }
   }
 
-  private val logger = Logger("application")
-  private val errorLogger = Logger("error")
+  private val logger = LoggerFactory.getLogger("application")
+  private val errorLogger = LoggerFactory.getLogger("error")
 
   def info[T: Loggable](msg: => T) = 
logger.info(implicitly[Loggable[T]].toLogMessage(msg))
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala
index 6374d43..2aad32f 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/EdgeTest.scala
@@ -4,8 +4,37 @@ import com.kakao.s2graph.core.mysqls.LabelMeta
 import com.kakao.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import com.kakao.s2graph.core.utils.logger
 import org.scalatest.FunSuite
+import org.scalatest.matchers.Matcher
 
 class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
+  initTests()
+
+  test("toLogString") {
+    val testLabelName = labelNameV2
+    val bulkQueries = List(
+      ("1445240543366", "update", "{\"is_blocked\":true}"),
+      ("1445240543362", "insert", "{\"is_hidden\":false}"),
+      ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"),
+      ("1445240543363", "delete", "{}"),
+      ("1445240543365", "update", "{\"time\":1, \"weight\":-10}"))
+
+    val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
+
+    val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
+      Management.toEdge(ts.toLong, op, srcId, tgtId, labelName, "out", 
props).toLogString
+    }).mkString("\n")
+
+    val expected = Seq(
+      Seq("1445240543366", "update", "e", "1", "2", testLabelName, 
"{\"is_blocked\":true}"),
+      Seq("1445240543362", "insert", "e", "1", "2", testLabelName, 
"{\"is_hidden\":false}"),
+      Seq("1445240543364", "insert", "e", "1", "2", testLabelName, 
"{\"is_hidden\":false,\"weight\":10}"),
+      Seq("1445240543363", "delete", "e", "1", "2", testLabelName),
+      Seq("1445240543365", "update", "e", "1", "2", testLabelName, 
"{\"time\":1,\"weight\":-10}")
+    ).map(_.mkString("\t")).mkString("\n")
+
+    assert(bulkEdge === expected)
+  }
+
   test("buildOperation") {
     val schemaVersion = "v2"
     val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala
deleted file mode 100644
index 9a58bd0..0000000
--- a/s2core/src/test/scala/com/kakao/s2graph/core/GraphTest.scala
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.kakao.s2graph.core
-
-/**
- * Created by shon on 5/29/15.
- */
-class GraphTest {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala
new file mode 100644
index 0000000..1c09778
--- /dev/null
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/CrudTest.scala
@@ -0,0 +1,226 @@
+package com.kakao.s2graph.core.Integrate
+
+import com.kakao.s2graph.core.mysqls._
+import play.api.libs.json.{JsObject, Json}
+
+class CrudTest extends IntegrateCommon {
+  import CrudHelper._
+  import TestUtil._
+
+  test("test CRUD") {
+    var tcNum = 0
+    var tcString = ""
+    var bulkQueries = List.empty[(Long, String, String)]
+    var expected = Map.empty[String, String]
+
+    val curTime = System.currentTimeMillis
+    val t1 = curTime + 0
+    val t2 = curTime + 1
+    val t3 = curTime + 2
+    val t4 = curTime + 3
+    val t5 = curTime + 4
+
+    val tcRunner = new CrudTestRunner()
+    tcNum = 1
+    tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) 
test "
+
+    bulkQueries = List(
+      (t1, "insert", "{\"time\": 10}"),
+      (t2, "delete", ""),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 2
+    tcString = "[t1 -> t2 -> t3 test case] insert(t1) delete(t2) insert(t3) 
test "
+    bulkQueries = List(
+      (t1, "insert", "{\"time\": 10}"),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 3
+    tcString = "[t3 -> t2 -> t1 test case] insert(t3) delete(t2) insert(t1) 
test "
+    bulkQueries = List(
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""),
+      (t1, "insert", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 4
+    tcString = "[t3 -> t1 -> t2 test case] insert(t3) insert(t1) delete(t2) 
test "
+    bulkQueries = List(
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "insert", "{\"time\": 10}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 5
+    tcString = "[t2 -> t1 -> t3 test case] delete(t2) insert(t1) insert(t3) 
test"
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t1, "insert", "{\"time\": 10}"),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 6
+    tcString = "[t2 -> t3 -> t1 test case] delete(t2) insert(t3) insert(t1) 
test "
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t3, "insert", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "insert", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 7
+    tcString = "[t1 -> t2 -> t3 test case] update(t1) delete(t2) update(t3) 
test "
+    bulkQueries = List(
+      (t1, "update", "{\"time\": 10}"),
+      (t2, "delete", ""),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 8
+    tcString = "[t1 -> t3 -> t2 test case] update(t1) update(t3) delete(t2) 
test "
+    bulkQueries = List(
+      (t1, "update", "{\"time\": 10}"),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 9
+    tcString = "[t2 -> t1 -> t3 test case] delete(t2) update(t1) update(t3) 
test "
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t1, "update", "{\"time\": 10}"),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 10
+    tcString = "[t2 -> t3 -> t1 test case] delete(t2) update(t3) update(t1) 
test"
+    bulkQueries = List(
+      (t2, "delete", ""),
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "update", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 11
+    tcString = "[t3 -> t2 -> t1 test case] update(t3) delete(t2) update(t1) 
test "
+    bulkQueries = List(
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t2, "delete", ""),
+      (t1, "update", "{\"time\": 10}"))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+    tcNum = 12
+    tcString = "[t3 -> t1 -> t2 test case] update(t3) update(t1) delete(t2) 
test "
+    bulkQueries = List(
+      (t3, "update", "{\"time\": 10, \"weight\": 20}"),
+      (t1, "update", "{\"time\": 10}"),
+      (t2, "delete", ""))
+    expected = Map("time" -> "10", "weight" -> "20")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+
+    tcNum = 13
+    tcString = "[t5 -> t1 -> t3 -> t2 -> t4 test case] update(t5) insert(t1) 
insert(t3) delete(t2) update(t4) test "
+    bulkQueries = List(
+      (t5, "update", "{\"is_blocked\": true}"),
+      (t1, "insert", "{\"is_hidden\": false}"),
+      (t3, "insert", "{\"is_hidden\": false, \"weight\": 10}"),
+      (t2, "delete", ""),
+      (t4, "update", "{\"time\": 1, \"weight\": -10}"))
+    expected = Map("time" -> "1", "weight" -> "-10", "is_hidden" -> "false", 
"is_blocked" -> "true")
+
+    tcRunner.run(tcNum, tcString, bulkQueries, expected)
+  }
+
+
+  object CrudHelper {
+
+    class CrudTestRunner {
+      var seed = 0
+
+      def run(tcNum: Int, tcString: String, opWithProps: List[(Long, String, 
String)], expected: Map[String, String]) = {
+        for {
+          labelName <- List(testLabelName, testLabelName2)
+          i <- 0 until NumOfEachTest
+        } {
+          seed += 1
+          val srcId = seed.toString
+          val tgtId = srcId
+
+          val maxTs = opWithProps.map(t => t._1).max
+
+          /** insert edges */
+          println(s"---- TC${tcNum}_init ----")
+          val bulkEdges = (for ((ts, op, props) <- opWithProps) yield {
+            TestUtil.toEdge(ts, op, "e", srcId, tgtId, labelName, props)
+          })
+
+          TestUtil.insertEdgesSync(bulkEdges: _*)
+
+          for {
+            label <- Label.findByName(labelName)
+            direction <- List("out", "in")
+            cacheTTL <- List(-1L)
+          } {
+            val (serviceName, columnName, id, otherId) = direction match {
+              case "out" => (label.srcService.serviceName, 
label.srcColumn.columnName, srcId, tgtId)
+              case "in" => (label.tgtService.serviceName, 
label.tgtColumn.columnName, tgtId, srcId)
+            }
+
+            val qId = if (labelName == testLabelName) id else "\"" + id + "\""
+            val query = queryJson(serviceName, columnName, labelName, qId, 
direction, cacheTTL)
+
+            val jsResult = TestUtil.getEdgesSync(query)
+
+            val results = jsResult \ "results"
+            val deegrees = (jsResult \ "degrees").as[List[JsObject]]
+            val propsLs = (results \\ "props").seq
+            (deegrees.head \ LabelMeta.degree.name).as[Int] should be(1)
+
+            val from = (results \\ "from").seq.last.toString.replaceAll("\"", 
"")
+            val to = (results \\ "to").seq.last.toString.replaceAll("\"", "")
+
+            from should be(id.toString)
+            to should be(otherId.toString)
+            (results \\ "_timestamp").seq.last.as[Long] should be(maxTs)
+
+            for ((key, expectedVal) <- expected) {
+              propsLs.last.as[JsObject].keys.contains(key) should be(true)
+              (propsLs.last \ key).toString should be(expectedVal)
+            }
+          }
+        }
+      }
+
+      def queryJson(serviceName: String, columnName: String, labelName: 
String, id: String, dir: String, cacheTTL: Long = -1L) = Json.parse(
+        s""" { "srcVertices": [
+             { "serviceName": "$serviceName",
+               "columnName": "$columnName",
+               "id": $id } ],
+             "steps": [ [ {
+             "label": "$labelName",
+             "direction": "$dir",
+             "offset": 0,
+             "limit": 10,
+             "cacheTTL": $cacheTTL }]]}""")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/f2311f25/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
new file mode 100644
index 0000000..0ced48f
--- /dev/null
+++ 
b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
@@ -0,0 +1,311 @@
+package com.kakao.s2graph.core.Integrate
+
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.mysqls.Label
+import com.kakao.s2graph.core.rest.RequestParser
+import com.kakao.s2graph.core.utils.logger
+import com.typesafe.config._
+import org.scalatest._
+import play.api.libs.json.{JsValue, Json}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
+
+  import TestUtil._
+
+  var graph: Graph = _
+  var parser: RequestParser = _
+  var config: Config = _
+
+  override def beforeAll = {
+    config = ConfigFactory.load()
+    graph = new Graph(config)(ExecutionContext.Implicits.global)
+    parser = new RequestParser(graph.config)
+    initTestData()
+  }
+
+  override def afterAll(): Unit = {
+    graph.shutdown()
+  }
+
+  /**
+    * Make Service, Label, Vertex for integrate test
+    */
+  def initTestData() = {
+    println("[init start]: 
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
+    Management.deleteService(testServiceName)
+
+    // 1. createService
+    val jsValue = Json.parse(createService)
+    val (serviceName, cluster, tableName, preSplitSize, ttl, 
compressionAlgorithm) =
+      parser.toServiceElements(jsValue)
+
+    val tryRes =
+      Management.createService(serviceName, cluster, tableName, preSplitSize, 
ttl, compressionAlgorithm)
+    println(s">> Service created : $createService, $tryRes")
+
+    val labelNames = Map(testLabelName -> testLabelNameCreate,
+      testLabelName2 -> testLabelName2Create,
+      testLabelNameV1 -> testLabelNameV1Create,
+      testLabelNameWeak -> testLabelNameWeakCreate)
+
+    for {
+      (labelName, create) <- labelNames
+    } {
+      Management.deleteLabel(labelName)
+      Label.findByName(labelName, useCache = false) match {
+        case None =>
+          val json = Json.parse(create)
+          val tryRes = for {
+            labelArgs <- parser.toLabelElements(json)
+            label <- (Management.createLabel _).tupled(labelArgs)
+          } yield label
+
+          tryRes.get
+        case Some(label) =>
+          println(s">> Label already exist: $create, $label")
+      }
+    }
+
+    val vertexPropsKeys = List("age" -> "int")
+
+    vertexPropsKeys.map { case (key, keyType) =>
+      Management.addVertexProp(testServiceName, testColumnName, key, keyType)
+    }
+
+    println("[init end]: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
+  }
+
+
+  /**
+    * Test Helpers
+    */
+  object TestUtil {
+    implicit def ec = scala.concurrent.ExecutionContext.global
+
+    //    def checkEdgeQueryJson(params: Seq[(String, String, String, 
String)]) = {
+    //      val arr = for {
+    //        (label, dir, from, to) <- params
+    //      } yield {
+    //        Json.obj("label" -> label, "direction" -> dir, "from" -> from, 
"to" -> to)
+    //      }
+    //
+    //      val s = Json.toJson(arr)
+    //      s
+    //    }
+
+    def deleteAllSync(jsValue: JsValue) = {
+      val future = Future.sequence(jsValue.as[Seq[JsValue]] map { json =>
+        val (labels, direction, ids, ts, vertices) = parser.toDeleteParam(json)
+        val future = graph.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
+
+        future
+      })
+
+      Await.result(future, HttpRequestWaitingTime)
+    }
+
+    def getEdgesSync(queryJson: JsValue): JsValue = {
+      logger.info(Json.prettyPrint(queryJson))
+
+      val ret = graph.getEdges(parser.toQuery(queryJson))
+      val result = Await.result(ret, HttpRequestWaitingTime)
+      val jsResult = PostProcess.toSimpleVertexArrJson(result)
+
+      jsResult
+    }
+
+    def insertEdgesSync(bulkEdges: String*) = {
+      val req = 
graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait 
= true)
+      val jsResult = Await.result(req, HttpRequestWaitingTime)
+
+      jsResult
+    }
+
+    def insertEdgesAsync(bulkEdges: String*) = {
+      val req = 
graph.mutateElements(parser.toGraphElements(bulkEdges.mkString("\n")), withWait 
= true)
+      req
+    }
+
+    def toEdge(elems: Any*): String = elems.mkString("\t")
+
+    // common tables
+    val testServiceName = "s2graph"
+    val testLabelName = "s2graph_label_test"
+    val testLabelName2 = "s2graph_label_test_2"
+    val testLabelNameV1 = "s2graph_label_test_v1"
+    val testLabelNameWeak = "s2graph_label_test_weak"
+    val testColumnName = "user_id_test"
+    val testColumnType = "long"
+    val testTgtColumnName = "item_id_test"
+    val testHTableName = "test-htable"
+    val newHTableName = "new-htable"
+    val index1 = "idx_1"
+    val index2 = "idx_2"
+
+    val NumOfEachTest = 30
+    val HttpRequestWaitingTime = Duration("60 seconds")
+
+    val createService = s"""{"serviceName" : "$testServiceName"}"""
+
+    val testLabelNameCreate =
+      s"""
+  {
+    "label": "$testLabelName",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "$testColumnName",
+    "tgtColumnType": "long",
+    "indices": [
+      {"name": "$index1", "propNames": ["weight", "time", "is_hidden", 
"is_blocked"]},
+      {"name": "$index2", "propNames": ["_timestamp"]}
+    ],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "strong",
+    "schemaVersion": "v2",
+    "compressionAlgorithm": "gz",
+    "hTableName": "$testHTableName"
+  }"""
+
+    val testLabelName2Create =
+      s"""
+  {
+    "label": "$testLabelName2",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "$testTgtColumnName",
+    "tgtColumnType": "string",
+    "indices": [{"name": "$index1", "propNames": ["time", "weight", 
"is_hidden", "is_blocked"]}],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "strong",
+    "isDirected": false,
+    "schemaVersion": "v3",
+    "compressionAlgorithm": "gz"
+  }"""
+
+    val testLabelNameV1Create =
+      s"""
+  {
+    "label": "$testLabelNameV1",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "${testTgtColumnName}_v1",
+    "tgtColumnType": "string",
+    "indices": [{"name": "$index1", "propNames": ["time", "weight", 
"is_hidden", "is_blocked"]}],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "strong",
+    "isDirected": true,
+    "schemaVersion": "v1",
+    "compressionAlgorithm": "gz"
+  }"""
+
+    val testLabelNameWeakCreate =
+      s"""
+  {
+    "label": "$testLabelNameWeak",
+    "srcServiceName": "$testServiceName",
+    "srcColumnName": "$testColumnName",
+    "srcColumnType": "long",
+    "tgtServiceName": "$testServiceName",
+    "tgtColumnName": "$testTgtColumnName",
+    "tgtColumnType": "string",
+    "indices": [{"name": "$index1", "propNames": ["time", "weight", 
"is_hidden", "is_blocked"]}],
+    "props": [
+    {
+      "name": "time",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "weight",
+      "dataType": "long",
+      "defaultValue": 0
+    },
+    {
+      "name": "is_hidden",
+      "dataType": "boolean",
+      "defaultValue": false
+    },
+    {
+      "name": "is_blocked",
+      "dataType": "boolean",
+      "defaultValue": false
+    }
+    ],
+    "consistencyLevel": "weak",
+    "isDirected": true,
+    "compressionAlgorithm": "gz"
+  }"""
+  }
+}

Reply via email to