http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
deleted file mode 100644
index f301d68..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/PostProcess.scala
+++ /dev/null
@@ -1,554 +0,0 @@
-package com.kakao.s2graph.core
-
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-
-import com.kakao.s2graph.core.mysqls.{ColumnMeta, Label, ServiceColumn, 
LabelMeta}
-import com.kakao.s2graph.core.types.{InnerVal, InnerValLike}
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.json.{Json, _}
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-
-object PostProcess extends JSONParser {
-
-
-  type EDGE_VALUES = Map[String, JsValue]
-  type ORDER_BY_VALUES =  (Any, Any, Any, Any)
-  type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
-
-  /**
-   * Result Entity score field name
-   */
-  val emptyDegrees = Seq.empty[JsValue]
-  val timeoutResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), 
"results" -> Json.arr(), "isTimeout" -> true)
-  val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" 
-> Json.arr(), "isEmpty" -> true)
-  def badRequestResults(ex: => Exception) = ex match {
-    case ex: BadQueryException => Json.obj("message" -> ex.msg)
-    case _ => Json.obj("message" -> ex.getMessage)
-  }
-
-  val SCORE_FIELD_NAME = "scoreSum"
-  val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", 
"_timestamp", "timestamp", "score", "props")
-
-  def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
exclude: Seq[QueryRequestWithResult]) = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
-    //    filterNot {case (edge, score) => 
edge.props.contains(LabelMeta.degreeSeq)}
-    val groupedEdgesWithRank = (for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, 
queryRequest.query.filterOutFields))
-    } yield {
-      (queryRequest.queryParam, edge, score)
-    }).groupBy {
-      case (queryParam, edge, rank) if edge.labelWithDir.dir == 
GraphUtil.directions("in") =>
-        (queryParam.label.srcColumn, queryParam.label.label, 
queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree)
-      case (queryParam, edge, rank) =>
-        (queryParam.label.tgtColumn, queryParam.label.label, 
queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree)
-    }
-
-    val ret = for {
-      ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) 
<- groupedEdgesWithRank if !isDegreeEdge
-      edgesWithRanks = edgesAndRanks.groupBy(x => 
x._2.srcVertex).map(_._2.head)
-      id <- innerValToJsValue(target, tgtColumn.columnType)
-    } yield {
-      Json.obj("name" -> tgtColumn.columnName, "id" -> id,
-        SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum,
-        "label" -> labelName,
-        "aggr" -> Json.obj(
-          "name" -> srcColumn.columnName,
-          "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) =>
-            innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)
-          },
-          "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) =>
-            Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, 
srcColumn.columnType),
-              "props" -> propsToJson(edge),
-              "score" -> rank
-            )
-          }
-        )
-      )
-    }
-
-    ret.toList
-  }
-
-  def sortWithFormatted(jsons: Seq[JsObject],
-                        scoreField: String = "scoreSum",
-                        queryRequestWithResultLs: Seq[QueryRequestWithResult],
-                        decrease: Boolean = true): JsObject = {
-    val ordering = if (decrease) -1 else 1
-    val sortedJsons = jsons.sortBy { jsObject => (jsObject \ 
scoreField).as[Double] * ordering }
-
-    if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, 
"results" -> sortedJsons)
-    else Json.obj(
-      "size" -> sortedJsons.size,
-      "results" -> sortedJsons,
-      "impressionId" -> 
queryRequestWithResultLs.head.queryRequest.query.impressionId()
-    )
-  }
-
-  private def toHashKey(edge: Edge, queryParam: QueryParam, fields: 
Seq[String], delimiter: String = ","): Int = {
-    val ls = for {
-      field <- fields
-    } yield {
-      field match {
-        case "from" | "_from" => edge.srcVertex.innerId.toIdString()
-        case "to" | "_to" => edge.tgtVertex.innerId.toIdString()
-        case "label" => edge.labelWithDir.labelId
-        case "direction" => 
JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
-        case "_timestamp" | "timestamp" => edge.ts
-        case _ =>
-          queryParam.label.metaPropsInvMap.get(field) match {
-            case None => throw new RuntimeException(s"unknow column: $field")
-            case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match {
-              case None => labelMeta.defaultValue
-              case Some(propVal) => propVal
-            }
-          }
-      }
-    }
-    val ret = ls.hashCode()
-    ret
-  }
-
-  def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
isSrcVertex: Boolean = false): Seq[Int] = {
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      q = queryRequest.query
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-    } yield  toHashKey(edge, queryRequest.queryParam, q.filterOutFields)
-  }
-
-  def summarizeWithListExcludeFormatted(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs, decrease = true)
-  }
-
-  def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs)
-  }
-
-  def summarizeWithListFormatted(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
-    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
-    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs)
-  }
-
-  def toSimpleVertexArrJson(queryRequestWithResultLs: 
Seq[QueryRequestWithResult]): JsValue = {
-    toSimpleVertexArrJson(queryRequestWithResultLs, 
Seq.empty[QueryRequestWithResult])
-  }
-
-  private def orderBy(queryOption: QueryOption,
-                      rawEdges: ListBuffer[(EDGE_VALUES, Double, 
ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = {
-    import com.kakao.s2graph.core.OrderingUtil._
-
-    if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
-      val ascendingLs = queryOption.orderByColumns.map(_._2)
-      rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs))
-    } else {
-      rawEdges
-    }
-  }
-
-  private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, 
edge: Edge, column: String): Any = {
-    column match {
-      case "score" => score
-      case "timestamp" | "_timestamp" => edge.ts
-      case _ =>
-        keyWithJs.get(column) match {
-          case None => keyWithJs.get("props").map { js => (js \ 
column).as[JsValue] }.get
-          case Some(x) => x
-        }
-    }
-  }
-
-  private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): 
JsValue = {
-    def traverse(js: JsValue): JsValue = js match {
-      case JsNull => mapper(JsNull)
-      case JsUndefined() => mapper(JsUndefined(""))
-      case JsNumber(v) => mapper(js)
-      case JsString(v) => mapper(js)
-      case JsBoolean(v) => mapper(js)
-      case JsArray(elements) => JsArray(elements.map { t => 
traverse(mapper(t)) })
-      case JsObject(values) => JsObject(values.map { case (k, v) => k -> 
traverse(mapper(v)) })
-    }
-
-    traverse(jsValue)
-  }
-
-  /** test query with filterOut is not working since it can not diffrentate 
filterOut */
-  private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): 
JsValue = {
-    val cursors = _cursors.flatten.iterator
-
-    buildReplaceJson(jsonQuery) {
-      case js@JsObject(fields) =>
-        val isStep = fields.find { case (k, _) => k == "label" } // find label 
group
-        if (isStep.isEmpty) js
-        else {
-          // TODO: Order not ensured
-          val withCursor = js.fieldSet | Set("cursor" -> 
JsString(cursors.next))
-          JsObject(withCursor.toSeq)
-        }
-      case js => js
-    }
-  }
-
-  private def buildRawEdges(queryOption: QueryOption,
-                            queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
-                            excludeIds: Map[Int, Boolean],
-                            scoreWeight: Double = 1.0): (ListBuffer[JsValue], 
ListBuffer[RAW_EDGE]) = {
-    val degrees = ListBuffer[JsValue]()
-    val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]()
-    val metaPropNamesMap = scala.collection.mutable.Map[String, Int]()
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-    } yield {
-      val (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      queryRequest.queryParam.label.metaPropNames.foreach { metaPropName =>
-        metaPropNamesMap.put(metaPropName, 
metaPropNamesMap.getOrElse(metaPropName, 0) + 1)
-      }
-    }
-    val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == 
queryRequestWithResultLs.length)
-    val orderByColumns = queryOption.orderByColumns.filter { case (column, _) 
=>
-      column match {
-        case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => 
true
-        case _ =>
-          propsExistInAll.contains(column)
-//          //TODO??
-//          false
-//          queryParam.label.metaPropNames.contains(column)
-      }
-    }
-
-    /** build result jsons */
-    for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      queryParam = queryRequest.queryParam
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      hashKey = toHashKey(edge, queryRequest.queryParam, 
queryOption.filterOutFields)
-      if !excludeIds.contains(hashKey)
-    } {
-
-      // edge to json
-      val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-      val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, 
srcColumn.columnType)
-      if (edge.isDegree && fromOpt.isDefined) {
-        if (queryOption.limitOpt.isEmpty) {
-          degrees += Json.obj(
-            "from" -> fromOpt.get,
-            "label" -> queryRequest.queryParam.label.label,
-            "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir),
-            LabelMeta.degree.name -> 
innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG)
-          )
-        }
-      } else {
-        val keyWithJs = edgeToJson(edge, score, queryRequest.query, 
queryRequest.queryParam)
-        val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match {
-          case 0 =>
-            (None, None, None, None)
-          case 1 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, None, None, None)
-          case 2 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, None, None)
-          case 3 =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, v3, None)
-          case _ =>
-            val it = orderByColumns.iterator
-            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1)
-            (v1, v2, v3, v4)
-        }
-
-        val currentEdge = (keyWithJs, score * scoreWeight, orderByValues)
-        rawEdges += currentEdge
-      }
-    }
-    (degrees, rawEdges)
-  }
-
-  private def buildResultJsValue(queryOption: QueryOption,
-                                 degrees: ListBuffer[JsValue],
-                                 rawEdges: ListBuffer[RAW_EDGE]): JsValue = {
-    if (queryOption.groupByColumns.isEmpty) {
-      // ordering
-      val filteredEdges = rawEdges.filter(t => t._2 >= 
queryOption.scoreThreshold)
-
-      val edges = queryOption.limitOpt match {
-        case None => orderBy(queryOption, filteredEdges).map(_._1)
-        case Some(limit) => orderBy(queryOption, 
filteredEdges).map(_._1).take(limit)
-      }
-      val resultDegrees = if (queryOption.returnDegree) degrees else 
emptyDegrees
-
-      Json.obj(
-        "size" -> edges.size,
-        "degrees" -> resultDegrees,
-//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
-        "results" -> edges
-      )
-    } else {
-      val grouped = rawEdges.groupBy { case (keyWithJs, _, _) =>
-        val props = keyWithJs.get("props")
-
-        for {
-          column <- queryOption.groupByColumns
-          value <- keyWithJs.get(column) match {
-            case None => props.flatMap { js => (js \ column).asOpt[JsValue] }
-            case Some(x) => Some(x)
-          }
-        } yield column -> value
-      }
-
-      val groupedEdgesWithScoreSum =
-        for {
-          (groupByKeyVals, groupedRawEdges) <- grouped
-          scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= 
queryOption.scoreThreshold
-        } yield {
-          // ordering
-          val edges = orderBy(queryOption, groupedRawEdges).map(_._1)
-
-          //TODO: refactor this
-          val js = if (queryOption.returnAgg)
-            Json.obj(
-              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
-              "scoreSum" -> scoreSum,
-              "agg" -> edges
-            )
-          else
-            Json.obj(
-              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
-              "scoreSum" -> scoreSum,
-              "agg" -> Json.arr()
-            )
-          (js, scoreSum)
-        }
-
-      val groupedSortedJsons = queryOption.limitOpt match {
-        case None =>
-          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => 
scoreSum * -1 }.map(_._1)
-        case Some(limit) =>
-          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => 
scoreSum * -1 }.map(_._1).take(limit)
-      }
-      val resultDegrees = if (queryOption.returnDegree) degrees else 
emptyDegrees
-      Json.obj(
-        "size" -> groupedSortedJsons.size,
-        "degrees" -> resultDegrees,
-//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
-        "results" -> groupedSortedJsons
-      )
-    }
-  }
-
-  def toSimpleVertexArrJsonMulti(queryOption: QueryOption,
-                                 resultWithExcludeLs: 
Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])],
-                                 excludes: Seq[QueryRequestWithResult]): 
JsValue = {
-    val excludeIds = (Seq((Seq.empty, excludes)) ++ 
resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, 
excludes)) =>
-      acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap
-    }
-
-    val (degrees, rawEdges) = (ListBuffer.empty[JsValue], 
ListBuffer.empty[RAW_EDGE])
-    for {
-      (result, localExclude) <- resultWithExcludeLs
-    } {
-      val newResult = result.map { queryRequestWithResult =>
-        val (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-        val newQuery = queryRequest.query.copy(queryOption = queryOption)
-        queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = 
newQuery))
-      }
-      val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, 
excludeIds)
-      degrees ++= _degrees
-      rawEdges ++= _rawEdges
-    }
-    buildResultJsValue(queryOption, degrees, rawEdges)
-  }
-
-  def toSimpleVertexArrJson(queryOption: QueryOption,
-                            queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
-                            exclude: Seq[QueryRequestWithResult]): JsValue = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
-    val (degrees, rawEdges) = buildRawEdges(queryOption, 
queryRequestWithResultLs, excludeIds)
-    buildResultJsValue(queryOption, degrees, rawEdges)
-  }
-
-
-  def toSimpleVertexArrJson(queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
-                            exclude: Seq[QueryRequestWithResult]): JsValue = {
-
-    queryRequestWithResultLs.headOption.map { queryRequestWithResult =>
-      val (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      val query = queryRequest.query
-      val queryOption = query.queryOption
-      val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
-      val (degrees, rawEdges) = buildRawEdges(queryOption, 
queryRequestWithResultLs, excludeIds)
-      buildResultJsValue(queryOption, degrees, rawEdges)
-    } getOrElse emptyResults
-  }
-
-  def verticesToJson(vertices: Iterable[Vertex]) = {
-    Json.toJson(vertices.flatMap { v => vertexToJson(v) })
-  }
-
-  def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, 
JsValue] = {
-    for {
-      (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
-      labelMeta <- queryParam.label.metaPropsMap.get(seq)
-      jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType)
-    } yield labelMeta.name -> jsValue
-  }
-
-  private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, 
queryParam: QueryParam): JsValue = {
-    if (parentEdges.isEmpty) {
-      JsNull
-    } else {
-      val parents = for {
-        parent <- parentEdges
-        (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get
-        parentQueryParam = QueryParam(parentEdge.labelWithDir)
-        parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if 
parents != JsNull
-      } yield {
-        val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge)
-        val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, 
parentQueryParam) + ("parents" -> parents)
-        Json.toJson(edgeJson)
-      }
-
-      Json.toJson(parents)
-    }
-  }
-
-  /** TODO */
-  def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: 
QueryParam): Map[String, JsValue] = {
-    val (srcColumn, tgtColumn) = 
queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
-
-    val kvMapOpt = for {
-      from <- innerValToJsValue(edge.srcVertex.id.innerId, 
srcColumn.columnType)
-      to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
-    } yield {
-      val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else 
(reservedColumns & q.selectColumnsSet) + "props"
-      val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ 
propsToJson(edge, q, queryParam)
-      val propsMap = if (q.selectColumnsSet.nonEmpty) 
_propsMap.filterKeys(q.selectColumnsSet) else _propsMap
-
-      val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, 
column) =>
-        val jsValue = column match {
-          case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - 
(System.currentTimeMillis() - queryParam.timestamp))
-          case "from" => from
-          case "to" => to
-          case "label" => JsString(queryParam.label.label)
-          case "direction" => 
JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
-          case "_timestamp" | "timestamp" => JsNumber(edge.ts)
-          case "score" => JsNumber(score)
-          case "props" if propsMap.nonEmpty => Json.toJson(propsMap)
-          case _ => JsNull
-        }
-
-        if (jsValue == JsNull) map else map + (column -> jsValue)
-      }
-      kvMap
-    }
-
-    kvMapOpt.getOrElse(Map.empty)
-  }
-
-  def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): 
Map[String, JsValue] = {
-    val kvs = edgeToJsonInner(edge, score, q, queryParam)
-    if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> 
Json.toJson(edgeParent(edge.parentEdges, q, queryParam)))
-    else kvs
-  }
-
-  def vertexToJson(vertex: Vertex): Option[JsObject] = {
-    val serviceColumn = ServiceColumn.findById(vertex.id.colId)
-
-    for {
-      id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType)
-    } yield {
-      Json.obj("serviceName" -> serviceColumn.service.serviceName,
-        "columnName" -> serviceColumn.columnName,
-        "id" -> id, "props" -> propsToJson(vertex),
-        "timestamp" -> vertex.ts,
-        //        "belongsTo" -> vertex.belongLabelIds)
-        "belongsTo" -> 
vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label)))
-    }
-  }
-
-  private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, 
InnerValLike]) = {
-    for {
-      (seq, value) <- props
-      name <- seqsToNames.get(seq)
-    } yield (name, value)
-  }
-
-  private def propsToJson(vertex: Vertex) = {
-    val serviceColumn = vertex.serviceColumn
-    val props = for {
-      (propKey, innerVal) <- vertex.props
-      columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, 
propKey.toByte, useCache = true)
-      jsValue <- innerValToJsValue(innerVal, columnMeta.dataType)
-    } yield {
-      (columnMeta.name -> jsValue)
-    }
-    props.toMap
-  }
-
-  @deprecated(message = "deprecated", since = "0.2")
-  def propsToJson(edge: Edge) = {
-    for {
-      (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
-      metaProp <- edge.label.metaPropsMap.get(seq)
-      jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType)
-    } yield {
-      (metaProp.name, jsValue)
-    }
-  }
-
-  @deprecated(message = "deprecated", since = "0.2")
-  def summarizeWithListExclude(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
-    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
-
-
-    val groupedEdgesWithRank = (for {
-      queryRequestWithResult <- queryRequestWithResultLs
-      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-      edgeWithScore <- queryResult.edgeWithScoreLs
-      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, 
queryRequest.query.filterOutFields))
-    } yield {
-      (edge, score)
-    }).groupBy { case (edge, score) =>
-      (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId)
-    }
-
-    val jsons = for {
-      ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank
-      (edges, ranks) = edgesAndRanks.groupBy(x => 
x._1.srcVertex).map(_._2.head).unzip
-      tgtId <- innerValToJsValue(target, tgtColumn.columnType)
-    } yield {
-      Json.obj(tgtColumn.columnName -> tgtId,
-        s"${srcColumn.columnName}s" ->
-          edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, 
srcColumn.columnType)), "scoreSum" -> ranks.sum)
-    }
-    val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ 
"scoreSum").as[Double] }.reverse
-    if (queryRequestWithResultLs.isEmpty) {
-      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
-    } else {
-      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons,
-        "impressionId" -> 
queryRequestWithResultLs.head.queryRequest.query.impressionId())
-    }
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
deleted file mode 100644
index 0effa07..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryParam.scala
+++ /dev/null
@@ -1,583 +0,0 @@
-package com.kakao.s2graph.core
-
-import com.google.common.hash.Hashing
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.parsers.{Where, WhereParser}
-import com.kakao.s2graph.core.types._
-import org.apache.hadoop.hbase.util.Bytes
-import org.hbase.async.ColumnRangeFilter
-import play.api.libs.json.{JsNull, JsNumber, JsValue, Json}
-
-import scala.util.hashing.MurmurHash3
-import scala.util.{Success, Try}
-
-object Query {
-  val initialScore = 1.0
-  lazy val empty = Query()
-
-  def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = 
Query(srcVertices, Vector(Step(List(queryParam))))
-
-  object DuplicatePolicy extends Enumeration {
-    type DuplicatePolicy = Value
-    val First, Sum, CountSum, Raw = Value
-
-    def apply(policy: String): Value = {
-      policy match {
-        case "sum" => Query.DuplicatePolicy.Sum
-        case "countSum" => Query.DuplicatePolicy.CountSum
-        case "raw" => Query.DuplicatePolicy.Raw
-        case _ => DuplicatePolicy.First
-      }
-    }
-  }
-}
-
-case class MultiQuery(queries: Seq[Query],
-                      weights: Seq[Double],
-                      queryOption: QueryOption,
-                      jsonQuery: JsValue = JsNull)
-
-case class QueryOption(removeCycle: Boolean = false,
-                       selectColumns: Seq[String] = Seq.empty,
-                       groupByColumns: Seq[String] = Seq.empty,
-                       orderByColumns: Seq[(String, Boolean)] = Seq.empty,
-                       filterOutQuery: Option[Query] = None,
-                       filterOutFields: Seq[String] = Seq(LabelMeta.to.name),
-                       withScore: Boolean = true,
-                       returnTree: Boolean = false,
-                       limitOpt: Option[Int] = None,
-                       returnAgg: Boolean = true,
-                       scoreThreshold: Double = Double.MinValue,
-                       returnDegree: Boolean = true)
-
-
-case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
-                 steps: IndexedSeq[Step] = Vector.empty[Step],
-                 queryOption: QueryOption = QueryOption(),
-                 jsonQuery: JsValue = JsNull) {
-
-  val removeCycle = queryOption.removeCycle
-  val selectColumns = queryOption.selectColumns
-//  val groupBy = queryOption.groupBy
-  val groupByColumns = queryOption.groupByColumns
-  val orderByColumns = queryOption.orderByColumns
-  val filterOutQuery = queryOption.filterOutQuery
-  val filterOutFields = queryOption.filterOutFields
-  val withScore = queryOption.withScore
-  val returnTree = queryOption.returnTree
-  val limitOpt = queryOption.limitOpt
-  val returnAgg = queryOption.returnAgg
-  val returnDegree = queryOption.returnDegree
-
-  def cacheKeyBytes: Array[Byte] = {
-    val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString)
-    val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString)
-    val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString)
-    val filterOutBytes = 
queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte])
-    val returnTreeBytes = Bytes.toBytes(queryOption.returnTree)
-
-    Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, 
returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add)
-  }
-
-  lazy val selectColumnsSet = queryOption.selectColumns.map { c =>
-    if (c == "_from") "from"
-    else if (c == "_to") "to"
-    else c
-  }.toSet
-
-  /** return logical query id without considering parameter values */
-  def templateId(): JsValue = {
-    Json.toJson(for {
-      step <- steps
-      queryParam <- step.queryParams.sortBy(_.labelWithDir.labelId)
-    } yield {
-        Json.obj("label" -> queryParam.label.label, "direction" -> 
GraphUtil.fromDirection(queryParam.labelWithDir.dir))
-      })
-  }
-
-  def impressionId(): JsNumber = {
-    val hash = MurmurHash3.stringHash(templateId().toString())
-    JsNumber(hash)
-  }
-
-  def cursorStrings(): Seq[Seq[String]] = {
-    //Don`t know how to replace all cursor keys in json
-    steps.map { step =>
-      step.queryParams.map { queryParam =>
-        queryParam.cursorOpt.getOrElse("")
-      }
-    }
-  }
-}
-
-object EdgeTransformer {
-  val DefaultTransformField = Json.arr("_to")
-  val DefaultTransformFieldAsList = Json.arr("_to").as[List[String]]
-  val DefaultJson = Json.arr(DefaultTransformField)
-}
-
-/**
- * TODO: step wise outputFields should be used with nextStepLimit, 
nextStepThreshold.
- * @param jsValue
- */
-case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) {
-  val Delimiter = "\\$"
-  val targets = jsValue.asOpt[List[Vector[String]]].toList
-  val fieldsLs = for {
-    target <- targets
-    fields <- target
-  } yield fields
-  val isDefault = fieldsLs.size == 1 && fieldsLs.head.size == 1 && 
(fieldsLs.head.head == "_to" || fieldsLs.head.head == "to")
-
-  def toHashKeyBytes: Array[Byte] = if (isDefault) Array.empty[Byte] else 
Bytes.toBytes(jsValue.toString)
-
-  def replace(fmt: String,
-              values: Seq[InnerValLike],
-              nextStepOpt: Option[Step]): Seq[InnerValLike] = {
-
-    val tokens = fmt.split(Delimiter)
-    val _values = values.padTo(tokens.length, InnerVal.withStr("", 
queryParam.label.schemaVersion))
-    val mergedStr = tokens.zip(_values).map { case (prefix, innerVal) => 
prefix + innerVal.toString }.mkString
-    //    logger.error(s"${tokens.toList}, ${values}, $mergedStr")
-    //    println(s"${tokens.toList}, ${values}, $mergedStr")
-    nextStepOpt match {
-      case None =>
-        val columnType =
-          if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) 
queryParam.label.tgtColumnType
-          else queryParam.label.srcColumnType
-
-        if (columnType == InnerVal.STRING) Seq(InnerVal.withStr(mergedStr, 
queryParam.label.schemaVersion))
-        else Nil
-      case Some(nextStep) =>
-        val nextQueryParamsValid = nextStep.queryParams.filter { qParam =>
-          if (qParam.labelWithDir.dir == GraphUtil.directions("out")) 
qParam.label.srcColumnType == "string"
-          else qParam.label.tgtColumnType == "string"
-        }
-        for {
-          nextQueryParam <- nextQueryParamsValid
-        } yield {
-          InnerVal.withStr(mergedStr, nextQueryParam.label.schemaVersion)
-        }
-    }
-  }
-
-  def toInnerValOpt(edge: Edge, fieldName: String): Option[InnerValLike] = {
-    fieldName match {
-      case LabelMeta.to.name => Option(edge.tgtVertex.innerId)
-      case LabelMeta.from.name => Option(edge.srcVertex.innerId)
-      case _ =>
-        for {
-          labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName)
-          value <- edge.propsWithTs.get(labelMeta.seq)
-        } yield value.innerVal
-    }
-  }
-
-  def transform(edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
-    if (isDefault) Seq(edge)
-    else {
-      val edges = for {
-        fields <- fieldsLs
-        innerVal <- {
-          if (fields.size == 1) {
-            val fieldName = fields.head
-            toInnerValOpt(edge, fieldName).toSeq
-          } else {
-            val fmt +: fieldNames = fields
-            replace(fmt, fieldNames.flatMap(fieldName => toInnerValOpt(edge, 
fieldName)), nextStepOpt)
-          }
-        }
-      } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = 
Option(edge))
-
-
-      edges
-    }
-  }
-}
-
-object Step {
-  val Delimiter = "|"
-}
-
-case class Step(queryParams: List[QueryParam],
-                labelWeights: Map[Int, Double] = Map.empty,
-                //                scoreThreshold: Double = 0.0,
-                nextStepScoreThreshold: Double = 0.0,
-                nextStepLimit: Int = -1,
-                cacheTTL: Long = -1) {
-
-  lazy val excludes = queryParams.filter(_.exclude)
-  lazy val includes = queryParams.filterNot(_.exclude)
-  lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
-
-  def toCacheKey(lss: Seq[Long]): Long = 
Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong()
-//    MurmurHash3.bytesHash(toCacheKeyRaw(lss))
-
-  def toCacheKeyRaw(lss: Seq[Long]): Array[Byte] = {
-    var bytes = Array.empty[Byte]
-    lss.sorted.foreach { h => bytes = Bytes.add(bytes, Bytes.toBytes(h)) }
-    bytes
-  }
-}
-
-case class VertexParam(vertices: Seq[Vertex]) {
-  var filters: Option[Map[Byte, InnerValLike]] = None
-
-  def has(what: Option[Map[Byte, InnerValLike]]): VertexParam = {
-    what match {
-      case None => this
-      case Some(w) => has(w)
-    }
-  }
-
-  def has(what: Map[Byte, InnerValLike]): VertexParam = {
-    this.filters = Some(what)
-    this
-  }
-
-}
-
-//object RankParam {
-//  def apply(labelId: Int, keyAndWeights: Seq[(Byte, Double)]) = {
-//    new RankParam(labelId, keyAndWeights)
-//  }
-//}
-
-case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = 
Seq.empty[(Byte, Double)]) {
-  // empty => Count
-  lazy val rankKeysWeightsMap = keySeqAndWeights.toMap
-
-  def defaultKey() = {
-    this.keySeqAndWeights = List((LabelMeta.countSeq, 1.0))
-    this
-  }
-
-  def toHashKeyBytes(): Array[Byte] = {
-    var bytes = Array.empty[Byte]
-    keySeqAndWeights.map { case (key, weight) =>
-      bytes = Bytes.add(bytes, Array.fill(1)(key), Bytes.toBytes(weight))
-    }
-    bytes
-  }
-}
-
-object QueryParam {
-  lazy val Empty = QueryParam(LabelWithDirection(0, 0))
-  lazy val DefaultThreshold = Double.MinValue
-  val Delimiter = ","
-}
-
-case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = 
System.currentTimeMillis()) {
-
-  import HBaseSerializable._
-  import Query.DuplicatePolicy
-  import Query.DuplicatePolicy._
-
-  lazy val label = Label.findById(labelWithDir.labelId)
-  val DefaultKey = LabelIndex.DefaultSeq
-  val fullKey = DefaultKey
-
-  var labelOrderSeq = fullKey
-
-  var sample = -1
-  var limit = 10
-  var offset = 0
-  var rank = new RankParam(labelWithDir.labelId, List(LabelMeta.countSeq -> 1))
-
-  var duration: Option[(Long, Long)] = None
-  var isInverted: Boolean = false
-
-  var columnRangeFilter: ColumnRangeFilter = null
-
-  var hasFilters: Map[Byte, InnerValLike] = Map.empty[Byte, InnerValLike]
-  var where: Try[Where] = Success(WhereParser.success)
-  var whereRawOpt: Option[String] = None
-  var duplicatePolicy = DuplicatePolicy.First
-  var rpcTimeoutInMillis = 1000
-  var maxAttempt = 2
-  var includeDegree = false
-  var tgtVertexInnerIdOpt: Option[InnerValLike] = None
-  var cacheTTLInMillis: Long = -1L
-  var threshold = QueryParam.DefaultThreshold
-  var timeDecay: Option[TimeDecay] = None
-  var transformer: EdgeTransformer = EdgeTransformer(this, 
EdgeTransformer.DefaultJson)
-  var scorePropagateOp: String = "multiply"
-  var exclude = false
-  var include = false
-  var shouldNormalize= false
-  var cursorOpt: Option[String] = None
-
-  var columnRangeFilterMinBytes = Array.empty[Byte]
-  var columnRangeFilterMaxBytes = Array.empty[Byte]
-
-  lazy val srcColumnWithDir = label.srcColumnWithDir(labelWithDir.dir)
-  lazy val tgtColumnWithDir = label.tgtColumnWithDir(labelWithDir.dir)
-
-  def toBytes(idxSeq: Byte, offset: Int, limit: Int, isInverted: Boolean): 
Array[Byte] = {
-    val front = Array[Byte](idxSeq, if (isInverted) 1.toByte else 0.toByte)
-    Bytes.add(front, Bytes.toBytes((offset.toLong << 32 | limit)))
-  }
-
-  /**
-   * consider only I/O specific parameters.
-   * properties that is used on Graph.filterEdges should not be considered.
-   * @param bytes
-   * @return
-   */
-  def toCacheKey(bytes: Array[Byte]): Long = {
-    val hashBytes = toCacheKeyRaw(bytes)
-    Hashing.murmur3_128().hashBytes(hashBytes).asLong()
-//    MurmurHash3.bytesHash(hashBytes)
-  }
-
-  def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = {
-    val transformBytes = transformer.toHashKeyBytes
-    //TODO: change this to binrary format.
-    val whereBytes = Bytes.toBytes(whereRawOpt.getOrElse(""))
-    val durationBytes = duration.map { case (min, max) =>
-      val minTs = min / cacheTTLInMillis
-      val maxTs = max / cacheTTLInMillis
-      Bytes.add(Bytes.toBytes(minTs), Bytes.toBytes(maxTs))
-    } getOrElse Array.empty[Byte]
-//    Bytes.toBytes(duration.toString)
-    val conditionBytes = Bytes.add(transformBytes, whereBytes, durationBytes)
-    Bytes.add(Bytes.add(bytes, labelWithDir.bytes, toBytes(labelOrderSeq, 
offset, limit, isInverted)), rank.toHashKeyBytes(),
-      Bytes.add(columnRangeFilterMinBytes, columnRangeFilterMaxBytes, 
conditionBytes))
-  }
-
-  def isInverted(isInverted: Boolean): QueryParam = {
-    this.isInverted = isInverted
-    this
-  }
-
-  def labelOrderSeq(labelOrderSeq: Byte): QueryParam = {
-    this.labelOrderSeq = labelOrderSeq
-    this
-  }
-
-  def sample(n: Int): QueryParam = {
-    this.sample = n
-    this
-  }
-
-  def limit(offset: Int, limit: Int): QueryParam = {
-    /** since degree info is located on first always */
-    if (offset == 0 && this.columnRangeFilter == null) {
-      this.limit = limit + 1
-      this.offset = offset
-    } else {
-      this.limit = limit
-      this.offset = offset + 1
-    }
-    //    this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, 
this.offset)
-    this
-  }
-
-  def interval(fromTo: Option[(Seq[(Byte, InnerValLike)], Seq[(Byte, 
InnerValLike)])]): QueryParam = {
-    fromTo match {
-      case Some((from, to)) => interval(from, to)
-      case _ => this
-    }
-  }
-
-  def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, 
InnerValLike)]): QueryParam = {
-    //    val len = label.orderTypes.size.toByte
-    //    val len = 
label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte
-    //    logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}")
-    val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
-
-    val minMetaByte = InnerVal.minMetaByte
-    //    val maxMetaByte = InnerVal.maxMetaByte
-    val maxMetaByte = -1.toByte
-    val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte))
-    //FIXME
-    val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte))
-    toVal(0) = len
-    fromVal(0) = len
-    val maxBytes = fromVal
-    val minBytes = toVal
-    this.columnRangeFilterMaxBytes = maxBytes
-    this.columnRangeFilterMinBytes = minBytes
-    val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
-    this.columnRangeFilter = rangeFilter
-    this
-  }
-
-  def duration(minMaxTs: Option[(Long, Long)]): QueryParam = {
-    minMaxTs match {
-      case Some((minTs, maxTs)) => duration(minTs, maxTs)
-      case _ => this
-    }
-  }
-
-  def duration(minTs: Long, maxTs: Long): QueryParam = {
-    this.duration = Some((minTs, maxTs))
-    this
-  }
-
-  def rank(r: RankParam): QueryParam = {
-    this.rank = r
-    this
-  }
-
-  def exclude(filterOut: Boolean): QueryParam = {
-    this.exclude = filterOut
-    this
-  }
-
-  def include(filterIn: Boolean): QueryParam = {
-    this.include = filterIn
-    this
-  }
-
-  def has(hasFilters: Map[Byte, InnerValLike]): QueryParam = {
-    this.hasFilters = hasFilters
-    this
-  }
-
-  def where(whereTry: Try[Where]): QueryParam = {
-    this.where = whereTry
-    this
-  }
-
-  def duplicatePolicy(policy: Option[DuplicatePolicy]): QueryParam = {
-    this.duplicatePolicy = policy.getOrElse(DuplicatePolicy.First)
-    this
-  }
-
-  def rpcTimeout(millis: Int): QueryParam = {
-    this.rpcTimeoutInMillis = millis
-    this
-  }
-
-  def maxAttempt(attempt: Int): QueryParam = {
-    this.maxAttempt = attempt
-    this
-  }
-
-  def includeDegree(includeDegree: Boolean): QueryParam = {
-    this.includeDegree = includeDegree
-    this
-  }
-
-  def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = {
-    this.tgtVertexInnerIdOpt = other
-    this
-  }
-
-  def cacheTTLInMillis(other: Long): QueryParam = {
-    this.cacheTTLInMillis = other
-    this
-  }
-
-  def timeDecay(other: Option[TimeDecay]): QueryParam = {
-    this.timeDecay = other
-    this
-  }
-
-  def threshold(other: Double): QueryParam = {
-    this.threshold = other
-    this
-  }
-
-  def transformer(other: Option[JsValue]): QueryParam = {
-    other match {
-      case Some(js) => this.transformer = EdgeTransformer(this, js)
-      case None =>
-    }
-    this
-  }
-
-  def scorePropagateOp(scorePropagateOp: String): QueryParam = {
-    this.scorePropagateOp = scorePropagateOp
-    this
-  }
-
-  def shouldNormalize(shouldNormalize: Boolean): QueryParam = {
-    this.shouldNormalize = shouldNormalize
-    this
-  }
-
-  def whereRawOpt(sqlOpt: Option[String]): QueryParam = {
-    this.whereRawOpt = sqlOpt
-    this
-  }
-
-  def cursorOpt(cursorOpt: Option[String]): QueryParam = {
-    this.cursorOpt = cursorOpt
-    this
-  }
-
-  def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined
-
-  override def toString = {
-    List(label.label, labelOrderSeq, offset, limit, rank,
-      duration, isInverted, exclude, include, hasFilters).mkString("\t")
-    //      duration, isInverted, exclude, include, hasFilters, 
outputFields).mkString("\t")
-  }
-
-  //
-  //  def buildGetRequest(srcVertex: Vertex) = {
-  //    val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
-  //    val (srcInnerId, tgtInnerId) = tgtVertexInnerIdOpt match {
-  //      case Some(tgtVertexInnerId) => // _to is given.
-  //        /** we use toInvertedEdgeHashLike so dont need to swap src, tgt */
-  //        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-  //        val tgt = InnerVal.convertVersion(tgtVertexInnerId, 
tgtColumn.columnType, label.schemaVersion)
-  //        (src, tgt)
-  //      case None =>
-  //        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-  //        (src, src)
-  //    }
-  //
-  //    val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), 
TargetVertexId(tgtColumn.id.get, tgtInnerId))
-  //    val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
-  //    val edge = Edge(srcV, tgtV, labelWithDir)
-  //
-  //    val get = if (tgtVertexInnerIdOpt.isDefined) {
-  //      val snapshotEdge = edge.toInvertedEdgeHashLike
-  //      val kv = snapshotEdge.kvs.head
-  //      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, 
kv.qualifier)
-  //    } else {
-  //      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq 
== labelOrderSeq)
-  //      assert(indexedEdgeOpt.isDefined)
-  //      val indexedEdge = indexedEdgeOpt.get
-  //      val kv = indexedEdge.kvs.head
-  //      val table = label.hbaseTableName.getBytes
-  //        //kv.table //
-  //      val rowKey = kv.row // indexedEdge.rowKey.bytes
-  //      val cf = edgeCf
-  //      new GetRequest(table, rowKey, cf)
-  //    }
-  //
-  //    val (minTs, maxTs) = duration.getOrElse((0L, Long.MaxValue))
-  //
-  //    get.maxVersions(1)
-  //    get.setFailfast(true)
-  //    get.setMaxResultsPerColumnFamily(limit)
-  //    get.setRowOffsetPerColumnFamily(offset)
-  //    get.setMinTimestamp(minTs)
-  //    get.setMaxTimestamp(maxTs)
-  //    get.setTimeout(rpcTimeoutInMillis)
-  //    if (columnRangeFilter != null) get.setFilter(columnRangeFilter)
-  //    //    get.setMaxAttempt(maxAttempt.toByte)
-  //    //    get.setRpcTimeout(rpcTimeoutInMillis)
-  //
-  //    //    if (columnRangeFilter != null) get.filter(columnRangeFilter)
-  //    //    logger.debug(s"Get: $get, $offset, $limit")
-  //
-  //    get
-  //  }
-}
-
-case class TimeDecay(initial: Double = 1.0,
-                     lambda: Double = 0.1,
-                     timeUnit: Double = 60 * 60 * 24,
-                     labelMetaSeq: Byte = LabelMeta.timeStampSeq) {
-  def decay(diff: Double): Double = {
-    //FIXME
-    val ret = initial * Math.pow(1.0 - lambda, diff / timeUnit)
-    //    logger.debug(s"$initial, $lambda, $timeUnit, $diff, ${diff / 
timeUnit}, $ret")
-    ret
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
deleted file mode 100644
index 02d9736..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/QueryResult.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-package com.kakao.s2graph.core
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
-
-import scala.collection.Seq
-
-object QueryResult {
-  def fromVertices(query: Query): Seq[QueryRequestWithResult] = {
-    if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
-      Seq.empty
-    } else {
-      val queryParam = query.steps.head.queryParams.head
-      val label = queryParam.label
-      val currentTs = System.currentTimeMillis()
-      val propsWithTs = Map(LabelMeta.timeStampSeq ->
-        InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs))
-      for {
-        vertex <- query.vertices
-      } yield {
-        val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = 
propsWithTs)
-        val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
-        QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
-          QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
-      }
-    }
-  }
-}
-case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: 
QueryResult)
-
-case class QueryRequest(query: Query,
-                        stepIdx: Int,
-                        vertex: Vertex,
-                        queryParam: QueryParam)
-
-
-case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
-                       tailCursor: Array[Byte] = Array.empty,
-                       timestamp: Long = System.currentTimeMillis(),
-                       isFailure: Boolean = false)
-
-case class EdgeWithScore(edge: Edge, score: Double)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala
deleted file mode 100644
index c2b86c2..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Vertex.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.kakao.s2graph.core
-
-
-import com.kakao.s2graph.core.mysqls._
-
-//import com.kakao.s2graph.core.models._
-
-import com.kakao.s2graph.core.types._
-import play.api.libs.json.Json
-
-/**
-  */
-case class Vertex(id: VertexId,
-                  ts: Long = System.currentTimeMillis(),
-                  props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
-                  op: Byte = 0,
-                  belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement {
-
-  val innerId = id.innerId
-
-  def schemaVer = serviceColumn.schemaVersion
-
-  def serviceColumn = ServiceColumn.findById(id.colId)
-
-  def service = Service.findById(serviceColumn.serviceId)
-
-  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
service.hTableName)
-
-  def defaultProps = Map(ColumnMeta.lastModifiedAtColumnSeq.toInt -> 
InnerVal.withLong(ts, schemaVer))
-
-  //  lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues
-
-  /** TODO: make this as configurable */
-  override def serviceName = service.serviceName
-
-  override def isAsync = false
-
-  override def queueKey = Seq(ts.toString, serviceName).mkString("|")
-
-  override def queuePartitionKey = id.innerId.toString
-
-  def propsWithName = for {
-    (seq, v) <- props
-    meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte)
-  } yield (meta.name -> v.toString)
-
-  //  /** only used by bulk loader */
-  //  def buildPuts(): List[Put] = {
-  //    //    logger.error(s"put: $this => $rowKey")
-  ////    val put = new Put(rowKey.bytes)
-  ////    for ((q, v) <- qualifiersWithValues) {
-  ////      put.addColumn(vertexCf, q, ts, v)
-  ////    }
-  ////    List(put)
-  //    val kv = kvs.head
-  //    val put = new Put(kv.row)
-  //    kvs.map { kv =>
-  //      put.addColumn(kv.cf, kv.qualifier, kv.timestamp, kv.value)
-  //    }
-  //    List(put)
-  //  }
-
-  def toEdgeVertex() = Vertex(SourceVertexId(id.colId, innerId), ts, props, op)
-
-
-  override def hashCode() = {
-    val hash = id.hashCode()
-    //    logger.debug(s"Vertex.hashCode: $this -> $hash")
-    hash
-  }
-
-  override def equals(obj: Any) = {
-    obj match {
-      case otherVertex: Vertex =>
-        val ret = id == otherVertex.id
-        //        logger.debug(s"Vertex.equals: $this, $obj => $ret")
-        ret
-      case _ => false
-    }
-  }
-
-  def withProps(newProps: Map[Int, InnerValLike]) = Vertex(id, ts, newProps, 
op)
-
-  def toLogString(): String = {
-    val (serviceName, columnName) =
-      if (!id.storeColId) ("", "")
-      else (serviceColumn.service.serviceName, serviceColumn.columnName)
-
-    if (propsWithName.nonEmpty)
-      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, 
Json.toJson(propsWithName)).mkString("\t")
-    else
-      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, 
columnName).mkString("\t")
-  }
-}
-
-object Vertex {
-
-  def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId
-
-  def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue
-
-  def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
-
-  //  val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, 
CompositeId.defaultInnerId, false, true),
-  //    System.currentTimeMillis())
-  def fromString(s: String): Option[Vertex] = Graph.toVertex(s)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala
deleted file mode 100644
index edc6147..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Bucket.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-/**
- * Created by shon on 8/5/15.
- */
-
-import scalikejdbc._
-
-object Bucket extends Model[Bucket] {
-
-  val rangeDelimiter = "~"
-  val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
-
-  def apply(rs: WrappedResultSet): Bucket = {
-    Bucket(rs.intOpt("id"),
-      rs.int("experiment_id"),
-      rs.string("modular"),
-      rs.string("http_verb"),
-      rs.string("api_path"),
-      rs.string("request_body"),
-      rs.int("timeout"),
-      rs.string("impression_id"),
-      rs.boolean("is_graph_query"),
-      rs.boolean("is_empty"))
-  }
-
-  def finds(experimentId: Int)(implicit session: DBSession = AutoSession): 
List[Bucket] = {
-    val cacheKey = "experimentId=" + experimentId
-    withCaches(cacheKey) {
-      sql"""select * from buckets where experiment_id = $experimentId"""
-        .map { rs => Bucket(rs) }.list().apply()
-    }
-  }
-
-  def toRange(str: String): Option[(Int, Int)] = {
-    val range = str.split(rangeDelimiter)
-    if (range.length == 2) Option(range.head.toInt, range.last.toInt)
-    else None
-  }
-
-  def findByImpressionId(impressionId: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
-    val cacheKey = "impressionId=" + impressionId
-    val sql = sql"""select * from buckets where impression_id=$impressionId"""
-      .map { rs => Bucket(rs)}
-    if (useCache) {
-      withCache(cacheKey) {
-        sql.single().apply()
-      }
-    } else {
-      sql.single().apply()
-    }
-  }
-}
-
-case class Bucket(id: Option[Int],
-                  experimentId: Int,
-                  modular: String,
-                  httpVerb: String, apiPath: String,
-                  requestBody: String, timeout: Int, impressionId: String,
-                  isGraphQuery: Boolean = true,
-                  isEmpty: Boolean = false) {
-
-  import Bucket._
-
-  lazy val rangeOpt = toRange(modular)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala
deleted file mode 100644
index 1b6d55f..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ColumnMeta.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object ColumnMeta extends Model[ColumnMeta] {
-
-  val timeStampSeq = 0.toByte
-  val countSeq = -1.toByte
-  val lastModifiedAtColumnSeq = 0.toByte
-  val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", 
lastModifiedAtColumnSeq, "long")
-  val maxValue = Byte.MaxValue
-
-  def apply(rs: WrappedResultSet): ColumnMeta = {
-    ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), 
rs.byte("seq"), rs.string("data_type").toLowerCase())
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    //    val cacheKey = s"id=$id"
-    val cacheKey = "id=" + id
-    withCache(cacheKey) {
-      sql"""select * from column_metas where id = ${id}""".map { rs => 
ColumnMeta(rs) }.single.apply
-    }.get
-  }
-
-  def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession) = {
-    //    val cacheKey = s"columnId=$columnId"
-    val cacheKey = "columnId=" + columnId
-    if (useCache) {
-      withCaches(cacheKey)( sql"""select *from column_metas where column_id = 
${columnId} order by seq ASC"""
-        .map { rs => ColumnMeta(rs) }.list.apply())
-    } else {
-      sql"""select * from column_metas where column_id = ${columnId} order by 
seq ASC"""
-        .map { rs => ColumnMeta(rs) }.list.apply()
-    }
-  }
-
-  def findByName(columnId: Int, name: String)(implicit session: DBSession = 
AutoSession) = {
-    //    val cacheKey = s"columnId=$columnId:name=$name"
-    val cacheKey = "columnId=" + columnId + ":name=" + name
-    withCache(cacheKey)( sql"""select * from column_metas where column_id = 
${columnId} and name = ${name}"""
-      .map { rs => ColumnMeta(rs) }.single.apply())
-  }
-
-  def insert(columnId: Int, name: String, dataType: String)(implicit session: 
DBSession = AutoSession) = {
-    val ls = findAllByColumn(columnId, false)
-    val seq = ls.size + 1
-    if (seq <= maxValue) {
-      sql"""insert into column_metas(column_id, name, seq, data_type)
-    select ${columnId}, ${name}, ${seq}, ${dataType}"""
-        .updateAndReturnGeneratedKey.apply()
-    }
-  }
-
-  def findOrInsert(columnId: Int, name: String, dataType: String)(implicit 
session: DBSession = AutoSession): ColumnMeta = {
-    findByName(columnId, name) match {
-      case Some(c) => c
-      case None =>
-        insert(columnId, name, dataType)
-        expireCache(s"columnId=$columnId:name=$name")
-        findByName(columnId, name).get
-    }
-  }
-
-  def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "columnId=" + columnId + ":seq=" + seq
-    lazy val columnMetaOpt = sql"""
-        select * from column_metas where column_id = ${columnId} and seq = 
${seq}
-    """.map { rs => ColumnMeta(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey)(columnMetaOpt)
-    else columnMetaOpt
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val columnMeta = findById(id)
-    val (columnId, name) = (columnMeta.columnId, columnMeta.name)
-    sql"""delete from column_metas where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", 
s"colunmId=$columnId")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from column_metas""".map { rs => ColumnMeta(rs) 
}.list().apply()
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"columnId=${x.columnId}:name=${x.name}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"columnId=${x.columnId}:seq=${x.seq}"
-      (cacheKey -> x)
-    })
-    putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) =>
-      val cacheKey = s"columnId=${columnId}"
-      (cacheKey -> ls)
-    }.toList)
-  }
-}
-
-case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, 
dataType: String) {
-  lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
deleted file mode 100644
index 46b92ab..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Experiment.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-import com.kakao.s2graph.core.GraphUtil
-import com.kakao.s2graph.core.utils.logger
-import scalikejdbc._
-
-import scala.util.Random
-
-object Experiment extends Model[Experiment] {
-  val impressionKey = "S2-Impression-Id"
-
-  def apply(rs: WrappedResultSet): Experiment = {
-    Experiment(rs.intOpt("id"),
-      rs.int("service_id"),
-      rs.string("name"),
-      rs.string("description"),
-      rs.string("experiment_type"),
-      rs.int("total_modular"))
-  }
-
-  def finds(serviceId: Int)(implicit session: DBSession = AutoSession): 
List[Experiment] = {
-    val cacheKey = "serviceId=" + serviceId
-    withCaches(cacheKey) {
-      sql"""select * from experiments where service_id = ${serviceId}"""
-        .map { rs => Experiment(rs) }.list().apply()
-    }
-  }
-
-  def findBy(serviceId: Int, name: String)(implicit session: DBSession = 
AutoSession): Option[Experiment] = {
-    val cacheKey = "serviceId=" + serviceId + ":name=" + name
-    withCache(cacheKey) {
-      sql"""select * from experiments where service_id = ${serviceId} and name 
= ${name}"""
-        .map { rs => Experiment(rs) }.single.apply
-    }
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): 
Option[Experiment] = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(
-      sql"""select * from experiments where id = ${id}"""
-        .map { rs => Experiment(rs) }.single.apply
-    )
-  }
-}
-
-case class Experiment(id: Option[Int],
-                      serviceId: Int,
-                      name: String,
-                      description: String,
-                      experimentType: String,
-                      totalModular: Int) {
-
-  def buckets = Bucket.finds(id.get)
-
-  def rangeBuckets = for {
-    bucket <- buckets
-    range <- bucket.rangeOpt
-  } yield range -> bucket
-
-
-  def findBucket(uuid: String, impIdOpt: Option[String] = None): 
Option[Bucket] = {
-    impIdOpt match {
-      case Some(impId) => Bucket.findByImpressionId(impId)
-      case None =>
-        val seed = experimentType match {
-          case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1
-          case _ => Random.nextInt(totalModular) + 1
-        }
-        findBucket(seed)
-    }
-  }
-
-  def findBucket(uuidMod: Int): Option[Bucket] = {
-    rangeBuckets.find { case ((from, to), bucket) =>
-      from <= uuidMod && uuidMod <= to
-    }.map(_._2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
deleted file mode 100644
index 005a01e..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
+++ /dev/null
@@ -1,372 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-import com.kakao.s2graph.core.GraphExceptions.ModelNotFoundException
-import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{GraphUtil, JSONParser, Management}
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object Label extends Model[Label] {
-
-  val maxHBaseTableNames = 2
-
-  def apply(rs: WrappedResultSet): Label = {
-    Label(Option(rs.int("id")), rs.string("label"),
-      rs.int("src_service_id"), rs.string("src_column_name"), 
rs.string("src_column_type"),
-      rs.int("tgt_service_id"), rs.string("tgt_column_name"), 
rs.string("tgt_column_type"),
-      rs.boolean("is_directed"), rs.string("service_name"), 
rs.int("service_id"), rs.string("consistency_level"),
-      rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), 
rs.string("schema_version"), rs.boolean("is_async"), 
rs.string("compressionAlgorithm"))
-  }
-
-  def deleteAll(label: Label)(implicit session: DBSession) = {
-    val id = label.id
-    LabelMeta.findAllByLabelId(id.get, false).foreach { x => 
LabelMeta.delete(x.id.get) }
-    LabelIndex.findByLabelIdAll(id.get, false).foreach { x => 
LabelIndex.delete(x.id.get) }
-    Label.delete(id.get)
-  }
-
-  def findByName(labelName: String, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): Option[Label] = {
-    val cacheKey = "label=" + labelName
-    lazy val labelOpt =
-      sql"""
-        select *
-        from labels
-        where label = ${labelName}""".map { rs => Label(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey)(labelOpt)
-    else labelOpt
-  }
-
-  def insert(label: String,
-             srcServiceId: Int,
-             srcColumnName: String,
-             srcColumnType: String,
-             tgtServiceId: Int,
-             tgtColumnName: String,
-             tgtColumnType: String,
-             isDirected: Boolean,
-             serviceName: String,
-             serviceId: Int,
-             consistencyLevel: String,
-             hTableName: String,
-             hTableTTL: Option[Int],
-             schemaVersion: String,
-             isAsync: Boolean,
-             compressionAlgorithm: String)(implicit session: DBSession = 
AutoSession) = {
-    sql"""
-       insert into labels(label,
-    src_service_id, src_column_name, src_column_type,
-    tgt_service_id, tgt_column_name, tgt_column_type,
-    is_directed, service_name, service_id, consistency_level, 
hbase_table_name, hbase_table_ttl, schema_version, is_async, 
compressionAlgorithm)
-       values (${label},
-    ${srcServiceId}, ${srcColumnName}, ${srcColumnType},
-    ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
-    ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, 
${hTableName}, ${hTableTTL},
-    ${schemaVersion}, ${isAsync}, ${compressionAlgorithm})
-    """
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): 
Option[Label] = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(
-      sql"""
-        select         *
-        from   labels
-        where  id = ${id}"""
-        .map { rs => Label(rs) }.single.apply())
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(
-      sql"""
-        select         *
-        from   labels
-        where  id = ${id}"""
-        .map { rs => Label(rs) }.single.apply()).get
-  }
-
-  def findByTgtColumnId(columnId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
-    val cacheKey = "tgtColumnId=" + columnId
-    val col = ServiceColumn.findById(columnId)
-    withCaches(cacheKey)(
-      sql"""
-          select       *
-          from labels
-          where        tgt_column_name = ${col.columnName}
-          and service_id = ${col.serviceId}
-        """.map { rs => Label(rs) }.list().apply())
-  }
-
-  def findBySrcColumnId(columnId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
-    val cacheKey = "srcColumnId=" + columnId
-    val col = ServiceColumn.findById(columnId)
-    withCaches(cacheKey)(
-      sql"""
-          select       *
-          from labels
-          where        src_column_name = ${col.columnName}
-          and service_id = ${col.serviceId}
-        """.map { rs => Label(rs) }.list().apply())
-  }
-
-  def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
-    val cacheKey = "srcServiceId=" + serviceId
-    withCaches(cacheKey)(
-      sql"""select * from labels where src_service_id = ${serviceId}""".map { 
rs => Label(rs) }.list().apply
-    )
-  }
-
-  def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
-    val cacheKey = "tgtServiceId=" + serviceId
-    withCaches(cacheKey)(
-      sql"""select * from labels where tgt_service_id = ${serviceId}""".map { 
rs => Label(rs) }.list().apply
-    )
-  }
-
-  def insertAll(labelName: String, srcServiceName: String, srcColumnName: 
String, srcColumnType: String,
-                tgtServiceName: String, tgtColumnName: String, tgtColumnType: 
String,
-                isDirected: Boolean = true,
-                serviceName: String,
-                indices: Seq[Index],
-                metaProps: Seq[Prop],
-                consistencyLevel: String,
-                hTableName: Option[String],
-                hTableTTL: Option[Int],
-                schemaVersion: String,
-                isAsync: Boolean,
-                compressionAlgorithm: String)(implicit session: DBSession = 
AutoSession): Label = {
-
-    val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
-    val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
-    val serviceOpt = Service.findByName(serviceName, useCache = false)
-    if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service 
$srcServiceName is not created.")
-    if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service 
$tgtServiceName is not created.")
-    if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName 
is not created.")
-
-    val newLabel = for {
-      srcService <- srcServiceOpt
-      tgtService <- tgtServiceOpt
-      service <- serviceOpt
-    } yield {
-        val srcServiceId = srcService.id.get
-        val tgtServiceId = tgtService.id.get
-        val serviceId = service.id.get
-
-        /** insert serviceColumn */
-        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, 
Some(srcColumnType), schemaVersion)
-        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, 
Some(tgtColumnType), schemaVersion)
-
-        if (srcCol.columnType != srcColumnType) throw new 
RuntimeException(s"source service column type not matched ${srcCol.columnType} 
!= ${srcColumnType}")
-        if (tgtCol.columnType != tgtColumnType) throw new 
RuntimeException(s"target service column type not matched ${tgtCol.columnType} 
!= ${tgtColumnType}")
-
-        /** create label */
-        Label.findByName(labelName, useCache = false).getOrElse {
-
-          val createdId = insert(labelName, srcServiceId, srcColumnName, 
srcColumnType,
-            tgtServiceId, tgtColumnName, tgtColumnType, isDirected, 
serviceName, serviceId, consistencyLevel,
-            hTableName.getOrElse(service.hTableName), 
hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, 
compressionAlgorithm).toInt
-
-          val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, 
dataType) =>
-            val labelMeta = LabelMeta.findOrInsert(createdId, propName, 
defaultValue, dataType)
-            (propName -> labelMeta.seq)
-          }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name 
-> labelMeta.seq).toMap
-
-          if (indices.isEmpty) {
-            // make default index with _PK, _timestamp, 0
-            LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, 
LabelIndex.DefaultMetaSeqs.toList, "none")
-          } else {
-            indices.foreach { index =>
-              val metaSeq = index.propNames.map { name => labelMetaMap(name) }
-              LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, 
"none")
-            }
-          }
-
-          val cacheKeys = List(s"id=$createdId", s"label=$labelName")
-          val ret = findByName(labelName, useCache = false).get
-          putsToCache(cacheKeys.map(k => k -> ret))
-          ret
-        }
-      }
-
-    newLabel.getOrElse(throw new RuntimeException("failed to create label"))
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from labels""".map { rs => Label(rs) 
}.list().apply()
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"label=${x.label}"
-      (cacheKey -> x)
-    })
-  }
-
-  def updateName(oldName: String, newName: String)(implicit session: DBSession 
= AutoSession) = {
-    logger.info(s"rename label: $oldName -> $newName")
-    sql"""update labels set label = ${newName} where label = 
${oldName}""".update.apply()
-  }
-
-  def updateHTableName(labelName: String, newHTableName: String)(implicit 
session: DBSession = AutoSession) = {
-    logger.info(s"update HTable of label $labelName to $newHTableName")
-    val cnt = sql"""update labels set hbase_table_name = $newHTableName where 
label = $labelName""".update().apply()
-    val label = Label.findByName(labelName, useCache = false).get
-
-    val cacheKeys = List(s"id=${label.id}", s"label=${label.label}")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-    cnt
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val label = findById(id)
-    logger.info(s"delete label: $label")
-    val cnt = sql"""delete from labels where id = 
${label.id.get}""".update().apply()
-    val cacheKeys = List(s"id=$id", s"label=${label.label}")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-    cnt
-  }
-}
-
-case class Label(id: Option[Int], label: String,
-                 srcServiceId: Int, srcColumnName: String, srcColumnType: 
String,
-                 tgtServiceId: Int, tgtColumnName: String, tgtColumnType: 
String,
-                 isDirected: Boolean = true, serviceName: String, serviceId: 
Int, consistencyLevel: String = "strong",
-                 hTableName: String, hTableTTL: Option[Int],
-                 schemaVersion: String, isAsync: Boolean = false,
-                 compressionAlgorithm: String) extends JSONParser {
-  def metas = LabelMeta.findAllByLabelId(id.get)
-
-  def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
-
-  //  lazy val firstHBaseTableName = 
hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
-  lazy val srcService = Service.findById(srcServiceId)
-  lazy val tgtService = Service.findById(tgtServiceId)
-  lazy val service = Service.findById(serviceId)
-  /**
-   * TODO
-   * change this to apply hbase table from target serviceName
-   */
-  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
-  //  lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, 
hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
-  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name")))
-  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
hTableName.split(",").head)
-
-  lazy val srcColumn = ServiceColumn.find(srcServiceId, 
srcColumnName).getOrElse(throw ModelNotFoundException("Source column not 
found"))
-  lazy val tgtColumn = ServiceColumn.find(tgtServiceId, 
tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not 
found"))
-
-  lazy val direction = if (isDirected) "out" else "undirected"
-  lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, 
LabelIndex.DefaultSeq)
-
-  //TODO: Make sure this is correct
-  lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
-  lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
-  lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
-  lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
-  lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && 
idx.id.get != defaultIndex.get.id.get)
-  //      indices filterNot (_.id.get == defaultIndex.get.id.get)
-  lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap
-
-  lazy val metaProps = LabelMeta.reservedMetas.map { m =>
-    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
-    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
-    else m
-  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
-
-  lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m =>
-    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
-    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
-    else m
-  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
-
-  lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap
-  lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap
-  lazy val metaPropNames = metaProps.map(x => x.name)
-  lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap
-
-  /** this is used only by edgeToProps */
-  lazy val metaPropsDefaultMap = (for {
-    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
-    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
-  } yield prop.name -> jsValue).toMap
-
-  lazy val metaPropsDefaultMapInner = (for {
-    prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
-    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
-  } yield prop.name -> jsValue).toMap
-
-  def srcColumnWithDir(dir: Int) = {
-    if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn
-  }
-
-  def tgtColumnWithDir(dir: Int) = {
-    if (dir == GraphUtil.directions("out")) tgtColumn else srcColumn
-  }
-
-  def srcTgtColumn(dir: Int) =
-    if (isDirected) {
-      (srcColumnWithDir(dir), tgtColumnWithDir(dir))
-    } else {
-      if (dir == GraphUtil.directions("in")) {
-        (tgtColumn, srcColumn)
-      } else {
-        (srcColumn, tgtColumn)
-      }
-    }
-
-  def init() = {
-    metas
-    metaSeqsToNames
-    service
-    srcColumn
-    tgtColumn
-    defaultIndex
-    indices
-    metaProps
-  }
-
-  //  def srcColumnInnerVal(jsValue: JsValue) = {
-  //    jsValueToInnerVal(jsValue, srcColumnType, version)
-  //  }
-  //  def tgtColumnInnerVal(jsValue: JsValue) = {
-  //    jsValueToInnerVal(jsValue, tgtColumnType, version)
-  //  }
-
-  override def toString(): String = {
-    val orderByKeys = LabelMeta.findAllByLabelId(id.get)
-    super.toString() + orderByKeys.toString()
-  }
-
-  //  def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
-  //    if (scoring.isEmpty) LabelIndex.defaultSeq
-  //    else {
-  //      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
-  //
-  ////      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
-  //    }
-  //  }
-  lazy val toJson = Json.obj("labelName" -> label,
-    "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
-    "isDirected" -> isDirected,
-    "serviceName" -> serviceName,
-    "consistencyLevel" -> consistencyLevel,
-    "schemaVersion" -> schemaVersion,
-    "isAsync" -> isAsync,
-    "compressionAlgorithm" -> compressionAlgorithm,
-    "defaultIndex" -> defaultIndex.map(x => x.toJson),
-    "extraIndex" -> extraIndices.map(exIdx => exIdx.toJson),
-    "metaProps" -> metaProps.filter { labelMeta => 
LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson)
-  )
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala
deleted file mode 100644
index 47f4a2a..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelIndex.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-/**
- * Created by shon on 6/3/15.
- */
-
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object LabelIndex extends Model[LabelIndex] {
-  val DefaultName = "_PK"
-  val DefaultMetaSeqs = Seq(LabelMeta.timeStampSeq)
-  val DefaultSeq = 1.toByte
-  val MaxOrderSeq = 7
-
-  def apply(rs: WrappedResultSet): LabelIndex = {
-    LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"),
-      rs.string("meta_seqs").split(",").filter(_ != "").map(s => 
s.toByte).toList match {
-        case metaSeqsList => metaSeqsList
-      },
-      rs.string("formulars"))
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey) {
-      sql"""select * from label_indices where id = ${id}""".map { rs => 
LabelIndex(rs) }.single.apply
-    }.get
-  }
-
-  def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession) = {
-    val cacheKey = "labelId=" + labelId
-    if (useCache) {
-      withCaches(cacheKey)( sql"""
-        select * from label_indices where label_id = ${labelId} and seq > 0 
order by seq ASC
-      """.map { rs => LabelIndex(rs) }.list.apply)
-    } else {
-      sql"""
-        select * from label_indices where label_id = ${labelId} and seq > 0 
order by seq ASC
-      """.map { rs => LabelIndex(rs) }.list.apply
-    }
-  }
-
-  def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], 
formulars: String)(implicit session: DBSession = AutoSession): Long = {
-    sql"""
-       insert into label_indices(label_id, name, seq, meta_seqs, formulars)
-       values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, 
${formulars})
-    """
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], 
formulars: String)(implicit session: DBSession = AutoSession): LabelIndex = {
-    findByLabelIdAndSeqs(labelId, metaSeqs) match {
-      case Some(s) => s
-      case None =>
-        val orders = findByLabelIdAll(labelId, false)
-        val seq = (orders.size + 1).toByte
-        assert(seq <= MaxOrderSeq)
-        val createdId = insert(labelId, indexName, seq, metaSeqs, formulars)
-        val cacheKeys = List(s"labelId=$labelId:seq=$seq",
-          s"labelId=$labelId:seqs=$metaSeqs", s"labelId=$labelId:seq=$seq", 
s"id=$createdId")
-        cacheKeys.foreach { key =>
-          expireCache(key)
-          expireCaches(key)
-        }
-
-        findByLabelIdAndSeq(labelId, seq).get
-    }
-  }
-
-  def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte])(implicit session: 
DBSession = AutoSession): Option[LabelIndex] = {
-    val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",")
-    withCache(cacheKey) {
-      sql"""
-      select * from label_indices where label_id = ${labelId} and meta_seqs = 
${seqs.mkString(",")}
-      """.map { rs => LabelIndex(rs) }.single.apply
-    }
-  }
-
-  def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
-    //    val cacheKey = s"labelId=$labelId:seq=$seq"
-    val cacheKey = "labelId=" + labelId + ":seq=" + seq
-    if (useCache) {
-      withCache(cacheKey)( sql"""
-      select * from label_indices where label_id = ${labelId} and seq = ${seq}
-      """.map { rs => LabelIndex(rs) }.single.apply)
-    } else {
-      sql"""
-      select * from label_indices where label_id = ${labelId} and seq = ${seq}
-      """.map { rs => LabelIndex(rs) }.single.apply
-    }
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val labelIndex = findById(id)
-    val seqs = labelIndex.metaSeqs.mkString(",")
-    val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
-    sql"""delete from label_indices where id = ${id}""".execute.apply()
-
-    val cacheKeys = List(s"id=$id", s"labelId=$labelId", 
s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) 
}.list.apply
-    putsToCache(ls.map { x =>
-      var cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}"
-      (cacheKey -> x)
-    })
-    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
-      val cacheKey = s"labelId=${labelId}"
-      (cacheKey -> ls)
-    }.toList)
-  }
-}
-
-/**
- * formular
- * ex1): w1, w2, w3
- * ex2): 1.5 * w1^2 + 3.4 * (w1 * w2), w2, w1
- */
-
-case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, 
metaSeqs: Seq[Byte], formulars: String) {
-  lazy val label = Label.findById(labelId)
-  lazy val metas = label.metaPropsMap
-  lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => 
label.metaPropsMap.get(metaSeq))
-  lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
-  lazy val toJson = Json.obj(
-    "name" -> name,
-    "propNames" -> sortKeyTypes.map(x => x.name)
-  )
-}

Reply via email to