http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
new file mode 100644
index 0000000..46b3255
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -0,0 +1,553 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.GraphExceptions.BadQueryException
+import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, 
ServiceColumn}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike}
+import play.api.libs.json.{Json, _}
+
+import scala.collection.mutable.ListBuffer
+
+object PostProcess extends JSONParser {
+
+
+  type EDGE_VALUES = Map[String, JsValue]
+  type ORDER_BY_VALUES =  (Any, Any, Any, Any)
+  type RAW_EDGE = (EDGE_VALUES, Double, ORDER_BY_VALUES)
+
+  /**
+   * Result Entity score field name
+   */
+  val emptyDegrees = Seq.empty[JsValue]
+  val timeoutResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), 
"results" -> Json.arr(), "isTimeout" -> true)
+  val emptyResults = Json.obj("size" -> 0, "degrees" -> Json.arr(), "results" 
-> Json.arr(), "isEmpty" -> true)
+  def badRequestResults(ex: => Exception) = ex match {
+    case ex: BadQueryException => Json.obj("message" -> ex.msg)
+    case _ => Json.obj("message" -> ex.getMessage)
+  }
+
+  val SCORE_FIELD_NAME = "scoreSum"
+  val reservedColumns = Set("cacheRemain", "from", "to", "label", "direction", 
"_timestamp", "timestamp", "score", "props")
+
+  def groupEdgeResult(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
exclude: Seq[QueryRequestWithResult]) = {
+    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
+    //    filterNot {case (edge, score) => 
edge.props.contains(LabelMeta.degreeSeq)}
+    val groupedEdgesWithRank = (for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, 
queryRequest.query.filterOutFields))
+    } yield {
+      (queryRequest.queryParam, edge, score)
+    }).groupBy {
+      case (queryParam, edge, rank) if edge.labelWithDir.dir == 
GraphUtil.directions("in") =>
+        (queryParam.label.srcColumn, queryParam.label.label, 
queryParam.label.tgtColumn, edge.tgtVertex.innerId, edge.isDegree)
+      case (queryParam, edge, rank) =>
+        (queryParam.label.tgtColumn, queryParam.label.label, 
queryParam.label.srcColumn, edge.tgtVertex.innerId, edge.isDegree)
+    }
+
+    val ret = for {
+      ((tgtColumn, labelName, srcColumn, target, isDegreeEdge), edgesAndRanks) 
<- groupedEdgesWithRank if !isDegreeEdge
+      edgesWithRanks = edgesAndRanks.groupBy(x => 
x._2.srcVertex).map(_._2.head)
+      id <- innerValToJsValue(target, tgtColumn.columnType)
+    } yield {
+      Json.obj("name" -> tgtColumn.columnName, "id" -> id,
+        SCORE_FIELD_NAME -> edgesWithRanks.map(_._3).sum,
+        "label" -> labelName,
+        "aggr" -> Json.obj(
+          "name" -> srcColumn.columnName,
+          "ids" -> edgesWithRanks.flatMap { case (queryParam, edge, rank) =>
+            innerValToJsValue(edge.srcVertex.innerId, srcColumn.columnType)
+          },
+          "edges" -> edgesWithRanks.map { case (queryParam, edge, rank) =>
+            Json.obj("id" -> innerValToJsValue(edge.srcVertex.innerId, 
srcColumn.columnType),
+              "props" -> propsToJson(edge),
+              "score" -> rank
+            )
+          }
+        )
+      )
+    }
+
+    ret.toList
+  }
+
+  def sortWithFormatted(jsons: Seq[JsObject],
+                        scoreField: String = "scoreSum",
+                        queryRequestWithResultLs: Seq[QueryRequestWithResult],
+                        decrease: Boolean = true): JsObject = {
+    val ordering = if (decrease) -1 else 1
+    val sortedJsons = jsons.sortBy { jsObject => (jsObject \ 
scoreField).as[Double] * ordering }
+
+    if (queryRequestWithResultLs.isEmpty) Json.obj("size" -> sortedJsons.size, 
"results" -> sortedJsons)
+    else Json.obj(
+      "size" -> sortedJsons.size,
+      "results" -> sortedJsons,
+      "impressionId" -> 
queryRequestWithResultLs.head.queryRequest.query.impressionId()
+    )
+  }
+
+  private def toHashKey(edge: Edge, queryParam: QueryParam, fields: 
Seq[String], delimiter: String = ","): Int = {
+    val ls = for {
+      field <- fields
+    } yield {
+      field match {
+        case "from" | "_from" => edge.srcVertex.innerId.toIdString()
+        case "to" | "_to" => edge.tgtVertex.innerId.toIdString()
+        case "label" => edge.labelWithDir.labelId
+        case "direction" => 
JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
+        case "_timestamp" | "timestamp" => edge.ts
+        case _ =>
+          queryParam.label.metaPropsInvMap.get(field) match {
+            case None => throw new RuntimeException(s"unknow column: $field")
+            case Some(labelMeta) => edge.propsWithTs.get(labelMeta.seq) match {
+              case None => labelMeta.defaultValue
+              case Some(propVal) => propVal
+            }
+          }
+      }
+    }
+    val ret = ls.hashCode()
+    ret
+  }
+
+  def resultInnerIds(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
isSrcVertex: Boolean = false): Seq[Int] = {
+    for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      q = queryRequest.query
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+    } yield  toHashKey(edge, queryRequest.queryParam, q.filterOutFields)
+  }
+
+  def summarizeWithListExcludeFormatted(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
+    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
+    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs, decrease = true)
+  }
+
+  def summarizeWithList(queryRequestWithResultLs: Seq[QueryRequestWithResult], 
exclude: Seq[QueryRequestWithResult]) = {
+    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
+    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs)
+  }
+
+  def summarizeWithListFormatted(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]) = {
+    val jsons = groupEdgeResult(queryRequestWithResultLs, exclude)
+    sortWithFormatted(jsons, queryRequestWithResultLs = 
queryRequestWithResultLs)
+  }
+
+  def toSimpleVertexArrJson(queryRequestWithResultLs: 
Seq[QueryRequestWithResult]): JsValue = {
+    toSimpleVertexArrJson(queryRequestWithResultLs, 
Seq.empty[QueryRequestWithResult])
+  }
+
+  private def orderBy(queryOption: QueryOption,
+                      rawEdges: ListBuffer[(EDGE_VALUES, Double, 
ORDER_BY_VALUES)]): ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)] = {
+    import OrderingUtil._
+
+    if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
+      val ascendingLs = queryOption.orderByColumns.map(_._2)
+      rawEdges.sortBy(_._3)(TupleMultiOrdering[Any](ascendingLs))
+    } else {
+      rawEdges
+    }
+  }
+
+  private def getColumnValue(keyWithJs: Map[String, JsValue], score: Double, 
edge: Edge, column: String): Any = {
+    column match {
+      case "score" => score
+      case "timestamp" | "_timestamp" => edge.ts
+      case _ =>
+        keyWithJs.get(column) match {
+          case None => keyWithJs.get("props").map { js => (js \ 
column).as[JsValue] }.get
+          case Some(x) => x
+        }
+    }
+  }
+
+  private def buildReplaceJson(jsValue: JsValue)(mapper: JsValue => JsValue): 
JsValue = {
+    def traverse(js: JsValue): JsValue = js match {
+      case JsNull => mapper(JsNull)
+      case JsUndefined() => mapper(JsUndefined(""))
+      case JsNumber(v) => mapper(js)
+      case JsString(v) => mapper(js)
+      case JsBoolean(v) => mapper(js)
+      case JsArray(elements) => JsArray(elements.map { t => 
traverse(mapper(t)) })
+      case JsObject(values) => JsObject(values.map { case (k, v) => k -> 
traverse(mapper(v)) })
+    }
+
+    traverse(jsValue)
+  }
+
+  /** test query with filterOut is not working since it can not diffrentate 
filterOut */
+  private def buildNextQuery(jsonQuery: JsValue, _cursors: Seq[Seq[String]]): 
JsValue = {
+    val cursors = _cursors.flatten.iterator
+
+    buildReplaceJson(jsonQuery) {
+      case js@JsObject(fields) =>
+        val isStep = fields.find { case (k, _) => k == "label" } // find label 
group
+        if (isStep.isEmpty) js
+        else {
+          // TODO: Order not ensured
+          val withCursor = js.fieldSet | Set("cursor" -> 
JsString(cursors.next))
+          JsObject(withCursor.toSeq)
+        }
+      case js => js
+    }
+  }
+
+  private def buildRawEdges(queryOption: QueryOption,
+                            queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
+                            excludeIds: Map[Int, Boolean],
+                            scoreWeight: Double = 1.0): (ListBuffer[JsValue], 
ListBuffer[RAW_EDGE]) = {
+    val degrees = ListBuffer[JsValue]()
+    val rawEdges = ListBuffer[(EDGE_VALUES, Double, ORDER_BY_VALUES)]()
+    val metaPropNamesMap = scala.collection.mutable.Map[String, Int]()
+    for {
+      queryRequestWithResult <- queryRequestWithResultLs
+    } yield {
+      val (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      queryRequest.queryParam.label.metaPropNames.foreach { metaPropName =>
+        metaPropNamesMap.put(metaPropName, 
metaPropNamesMap.getOrElse(metaPropName, 0) + 1)
+      }
+    }
+    val propsExistInAll = metaPropNamesMap.filter(kv => kv._2 == 
queryRequestWithResultLs.length)
+    val orderByColumns = queryOption.orderByColumns.filter { case (column, _) 
=>
+      column match {
+        case "from" | "to" | "label" | "score" | "timestamp" | "_timestamp" => 
true
+        case _ =>
+          propsExistInAll.contains(column)
+//          //TODO??
+//          false
+//          queryParam.label.metaPropNames.contains(column)
+      }
+    }
+
+    /** build result jsons */
+    for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      queryParam = queryRequest.queryParam
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+      hashKey = toHashKey(edge, queryRequest.queryParam, 
queryOption.filterOutFields)
+      if !excludeIds.contains(hashKey)
+    } {
+
+      // edge to json
+      val (srcColumn, _) = queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
+      val fromOpt = innerValToJsValue(edge.srcVertex.id.innerId, 
srcColumn.columnType)
+      if (edge.isDegree && fromOpt.isDefined) {
+        if (queryOption.limitOpt.isEmpty) {
+          degrees += Json.obj(
+            "from" -> fromOpt.get,
+            "label" -> queryRequest.queryParam.label.label,
+            "direction" -> GraphUtil.fromDirection(edge.labelWithDir.dir),
+            LabelMeta.degree.name -> 
innerValToJsValue(edge.propsWithTs(LabelMeta.degreeSeq).innerVal, InnerVal.LONG)
+          )
+        }
+      } else {
+        val keyWithJs = edgeToJson(edge, score, queryRequest.query, 
queryRequest.queryParam)
+        val orderByValues: (Any, Any, Any, Any) = orderByColumns.length match {
+          case 0 =>
+            (None, None, None, None)
+          case 1 =>
+            val it = orderByColumns.iterator
+            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            (v1, None, None, None)
+          case 2 =>
+            val it = orderByColumns.iterator
+            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            (v1, v2, None, None)
+          case 3 =>
+            val it = orderByColumns.iterator
+            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            (v1, v2, v3, None)
+          case _ =>
+            val it = orderByColumns.iterator
+            val v1 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            val v2 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            val v3 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            val v4 = getColumnValue(keyWithJs, score, edge, it.next()._1)
+            (v1, v2, v3, v4)
+        }
+
+        val currentEdge = (keyWithJs, score * scoreWeight, orderByValues)
+        rawEdges += currentEdge
+      }
+    }
+    (degrees, rawEdges)
+  }
+
+  private def buildResultJsValue(queryOption: QueryOption,
+                                 degrees: ListBuffer[JsValue],
+                                 rawEdges: ListBuffer[RAW_EDGE]): JsValue = {
+    if (queryOption.groupByColumns.isEmpty) {
+      // ordering
+      val filteredEdges = rawEdges.filter(t => t._2 >= 
queryOption.scoreThreshold)
+
+      val edges = queryOption.limitOpt match {
+        case None => orderBy(queryOption, filteredEdges).map(_._1)
+        case Some(limit) => orderBy(queryOption, 
filteredEdges).map(_._1).take(limit)
+      }
+      val resultDegrees = if (queryOption.returnDegree) degrees else 
emptyDegrees
+
+      Json.obj(
+        "size" -> edges.size,
+        "degrees" -> resultDegrees,
+//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
+        "results" -> edges
+      )
+    } else {
+      val grouped = rawEdges.groupBy { case (keyWithJs, _, _) =>
+        val props = keyWithJs.get("props")
+
+        for {
+          column <- queryOption.groupByColumns
+          value <- keyWithJs.get(column) match {
+            case None => props.flatMap { js => (js \ column).asOpt[JsValue] }
+            case Some(x) => Some(x)
+          }
+        } yield column -> value
+      }
+
+      val groupedEdgesWithScoreSum =
+        for {
+          (groupByKeyVals, groupedRawEdges) <- grouped
+          scoreSum = groupedRawEdges.map(x => x._2).sum if scoreSum >= 
queryOption.scoreThreshold
+        } yield {
+          // ordering
+          val edges = orderBy(queryOption, groupedRawEdges).map(_._1)
+
+          //TODO: refactor this
+          val js = if (queryOption.returnAgg)
+            Json.obj(
+              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+              "scoreSum" -> scoreSum,
+              "agg" -> edges
+            )
+          else
+            Json.obj(
+              "groupBy" -> Json.toJson(groupByKeyVals.toMap),
+              "scoreSum" -> scoreSum,
+              "agg" -> Json.arr()
+            )
+          (js, scoreSum)
+        }
+
+      val groupedSortedJsons = queryOption.limitOpt match {
+        case None =>
+          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => 
scoreSum * -1 }.map(_._1)
+        case Some(limit) =>
+          groupedEdgesWithScoreSum.toList.sortBy { case (jsVal, scoreSum) => 
scoreSum * -1 }.map(_._1).take(limit)
+      }
+      val resultDegrees = if (queryOption.returnDegree) degrees else 
emptyDegrees
+      Json.obj(
+        "size" -> groupedSortedJsons.size,
+        "degrees" -> resultDegrees,
+//        "queryNext" -> buildNextQuery(q.jsonQuery, q.cursorStrings()),
+        "results" -> groupedSortedJsons
+      )
+    }
+  }
+
+  def toSimpleVertexArrJsonMulti(queryOption: QueryOption,
+                                 resultWithExcludeLs: 
Seq[(Seq[QueryRequestWithResult], Seq[QueryRequestWithResult])],
+                                 excludes: Seq[QueryRequestWithResult]): 
JsValue = {
+    val excludeIds = (Seq((Seq.empty, excludes)) ++ 
resultWithExcludeLs).foldLeft(Map.empty[Int, Boolean]) { case (acc, (result, 
excludes)) =>
+      acc ++ resultInnerIds(excludes).map(hashKey => hashKey -> true).toMap
+    }
+
+    val (degrees, rawEdges) = (ListBuffer.empty[JsValue], 
ListBuffer.empty[RAW_EDGE])
+    for {
+      (result, localExclude) <- resultWithExcludeLs
+    } {
+      val newResult = result.map { queryRequestWithResult =>
+        val (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+        val newQuery = queryRequest.query.copy(queryOption = queryOption)
+        queryRequestWithResult.copy(queryRequest = queryRequest.copy(query = 
newQuery))
+      }
+      val (_degrees, _rawEdges) = buildRawEdges(queryOption, newResult, 
excludeIds)
+      degrees ++= _degrees
+      rawEdges ++= _rawEdges
+    }
+    buildResultJsValue(queryOption, degrees, rawEdges)
+  }
+
+  def toSimpleVertexArrJson(queryOption: QueryOption,
+                            queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
+                            exclude: Seq[QueryRequestWithResult]): JsValue = {
+    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
+    val (degrees, rawEdges) = buildRawEdges(queryOption, 
queryRequestWithResultLs, excludeIds)
+    buildResultJsValue(queryOption, degrees, rawEdges)
+  }
+
+
+  def toSimpleVertexArrJson(queryRequestWithResultLs: 
Seq[QueryRequestWithResult],
+                            exclude: Seq[QueryRequestWithResult]): JsValue = {
+
+    queryRequestWithResultLs.headOption.map { queryRequestWithResult =>
+      val (queryRequest, _) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      val query = queryRequest.query
+      val queryOption = query.queryOption
+      val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
+      val (degrees, rawEdges) = buildRawEdges(queryOption, 
queryRequestWithResultLs, excludeIds)
+      buildResultJsValue(queryOption, degrees, rawEdges)
+    } getOrElse emptyResults
+  }
+
+  def verticesToJson(vertices: Iterable[Vertex]) = {
+    Json.toJson(vertices.flatMap { v => vertexToJson(v) })
+  }
+
+  def propsToJson(edge: Edge, q: Query, queryParam: QueryParam): Map[String, 
JsValue] = {
+    for {
+      (seq, innerValWithTs) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
+      labelMeta <- queryParam.label.metaPropsMap.get(seq)
+      jsValue <- innerValToJsValue(innerValWithTs.innerVal, labelMeta.dataType)
+    } yield labelMeta.name -> jsValue
+  }
+
+  private def edgeParent(parentEdges: Seq[EdgeWithScore], q: Query, 
queryParam: QueryParam): JsValue = {
+    if (parentEdges.isEmpty) {
+      JsNull
+    } else {
+      val parents = for {
+        parent <- parentEdges
+        (parentEdge, parentScore) = EdgeWithScore.unapply(parent).get
+        parentQueryParam = QueryParam(parentEdge.labelWithDir)
+        parents = edgeParent(parentEdge.parentEdges, q, parentQueryParam) if 
parents != JsNull
+      } yield {
+        val originalEdge = parentEdge.originalEdgeOpt.getOrElse(parentEdge)
+        val edgeJson = edgeToJsonInner(originalEdge, parentScore, q, 
parentQueryParam) + ("parents" -> parents)
+        Json.toJson(edgeJson)
+      }
+
+      Json.toJson(parents)
+    }
+  }
+
+  /** TODO */
+  def edgeToJsonInner(edge: Edge, score: Double, q: Query, queryParam: 
QueryParam): Map[String, JsValue] = {
+    val (srcColumn, tgtColumn) = 
queryParam.label.srcTgtColumn(edge.labelWithDir.dir)
+
+    val kvMapOpt = for {
+      from <- innerValToJsValue(edge.srcVertex.id.innerId, 
srcColumn.columnType)
+      to <- innerValToJsValue(edge.tgtVertex.id.innerId, tgtColumn.columnType)
+    } yield {
+      val targetColumns = if (q.selectColumnsSet.isEmpty) reservedColumns else 
(reservedColumns & q.selectColumnsSet) + "props"
+      val _propsMap = queryParam.label.metaPropsDefaultMapInner ++ 
propsToJson(edge, q, queryParam)
+      val propsMap = if (q.selectColumnsSet.nonEmpty) 
_propsMap.filterKeys(q.selectColumnsSet) else _propsMap
+
+      val kvMap = targetColumns.foldLeft(Map.empty[String, JsValue]) { (map, 
column) =>
+        val jsValue = column match {
+          case "cacheRemain" => JsNumber(queryParam.cacheTTLInMillis - 
(System.currentTimeMillis() - queryParam.timestamp))
+          case "from" => from
+          case "to" => to
+          case "label" => JsString(queryParam.label.label)
+          case "direction" => 
JsString(GraphUtil.fromDirection(edge.labelWithDir.dir))
+          case "_timestamp" | "timestamp" => JsNumber(edge.ts)
+          case "score" => JsNumber(score)
+          case "props" if propsMap.nonEmpty => Json.toJson(propsMap)
+          case _ => JsNull
+        }
+
+        if (jsValue == JsNull) map else map + (column -> jsValue)
+      }
+      kvMap
+    }
+
+    kvMapOpt.getOrElse(Map.empty)
+  }
+
+  def edgeToJson(edge: Edge, score: Double, q: Query, queryParam: QueryParam): 
Map[String, JsValue] = {
+    val kvs = edgeToJsonInner(edge, score, q, queryParam)
+    if (kvs.nonEmpty && q.returnTree) kvs + ("parents" -> 
Json.toJson(edgeParent(edge.parentEdges, q, queryParam)))
+    else kvs
+  }
+
+  def vertexToJson(vertex: Vertex): Option[JsObject] = {
+    val serviceColumn = ServiceColumn.findById(vertex.id.colId)
+
+    for {
+      id <- innerValToJsValue(vertex.innerId, serviceColumn.columnType)
+    } yield {
+      Json.obj("serviceName" -> serviceColumn.service.serviceName,
+        "columnName" -> serviceColumn.columnName,
+        "id" -> id, "props" -> propsToJson(vertex),
+        "timestamp" -> vertex.ts,
+        //        "belongsTo" -> vertex.belongLabelIds)
+        "belongsTo" -> 
vertex.belongLabelIds.flatMap(Label.findByIdOpt(_).map(_.label)))
+    }
+  }
+
+  private def keysToName(seqsToNames: Map[Int, String], props: Map[Int, 
InnerValLike]) = {
+    for {
+      (seq, value) <- props
+      name <- seqsToNames.get(seq)
+    } yield (name, value)
+  }
+
+  private def propsToJson(vertex: Vertex) = {
+    val serviceColumn = vertex.serviceColumn
+    val props = for {
+      (propKey, innerVal) <- vertex.props
+      columnMeta <- ColumnMeta.findByIdAndSeq(serviceColumn.id.get, 
propKey.toByte, useCache = true)
+      jsValue <- innerValToJsValue(innerVal, columnMeta.dataType)
+    } yield {
+      (columnMeta.name -> jsValue)
+    }
+    props.toMap
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def propsToJson(edge: Edge) = {
+    for {
+      (seq, v) <- edge.propsWithTs if LabelMeta.isValidSeq(seq)
+      metaProp <- edge.label.metaPropsMap.get(seq)
+      jsValue <- innerValToJsValue(v.innerVal, metaProp.dataType)
+    } yield {
+      (metaProp.name, jsValue)
+    }
+  }
+
+  @deprecated(message = "deprecated", since = "0.2")
+  def summarizeWithListExclude(queryRequestWithResultLs: 
Seq[QueryRequestWithResult], exclude: Seq[QueryRequestWithResult]): JsObject = {
+    val excludeIds = resultInnerIds(exclude).map(innerId => innerId -> 
true).toMap
+
+
+    val groupedEdgesWithRank = (for {
+      queryRequestWithResult <- queryRequestWithResultLs
+      (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
+      edgeWithScore <- queryResult.edgeWithScoreLs
+      (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+      if !excludeIds.contains(toHashKey(edge, queryRequest.queryParam, 
queryRequest.query.filterOutFields))
+    } yield {
+      (edge, score)
+    }).groupBy { case (edge, score) =>
+      (edge.label.tgtColumn, edge.label.srcColumn, edge.tgtVertex.innerId)
+    }
+
+    val jsons = for {
+      ((tgtColumn, srcColumn, target), edgesAndRanks) <- groupedEdgesWithRank
+      (edges, ranks) = edgesAndRanks.groupBy(x => 
x._1.srcVertex).map(_._2.head).unzip
+      tgtId <- innerValToJsValue(target, tgtColumn.columnType)
+    } yield {
+      Json.obj(tgtColumn.columnName -> tgtId,
+        s"${srcColumn.columnName}s" ->
+          edges.flatMap(edge => innerValToJsValue(edge.srcVertex.innerId, 
srcColumn.columnType)), "scoreSum" -> ranks.sum)
+    }
+    val sortedJsons = jsons.toList.sortBy { jsObj => (jsObj \ 
"scoreSum").as[Double] }.reverse
+    if (queryRequestWithResultLs.isEmpty) {
+      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons)
+    } else {
+      Json.obj("size" -> sortedJsons.size, "results" -> sortedJsons,
+        "impressionId" -> 
queryRequestWithResultLs.head.queryRequest.query.impressionId())
+    }
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
new file mode 100644
index 0000000..2febadd
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -0,0 +1,583 @@
+package org.apache.s2graph.core
+
+import com.google.common.hash.Hashing
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.parsers.{Where, WhereParser}
+import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, 
InnerValLike, LabelWithDirection}
+import org.hbase.async.ColumnRangeFilter
+import play.api.libs.json.{JsNull, JsNumber, JsValue, Json}
+
+import scala.util.hashing.MurmurHash3
+import scala.util.{Success, Try}
+
+object Query {
+  val initialScore = 1.0
+  lazy val empty = Query()
+
+  def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = 
Query(srcVertices, Vector(Step(List(queryParam))))
+
+  object DuplicatePolicy extends Enumeration {
+    type DuplicatePolicy = Value
+    val First, Sum, CountSum, Raw = Value
+
+    def apply(policy: String): Value = {
+      policy match {
+        case "sum" => Query.DuplicatePolicy.Sum
+        case "countSum" => Query.DuplicatePolicy.CountSum
+        case "raw" => Query.DuplicatePolicy.Raw
+        case _ => DuplicatePolicy.First
+      }
+    }
+  }
+}
+
+case class MultiQuery(queries: Seq[Query],
+                      weights: Seq[Double],
+                      queryOption: QueryOption,
+                      jsonQuery: JsValue = JsNull)
+
+case class QueryOption(removeCycle: Boolean = false,
+                       selectColumns: Seq[String] = Seq.empty,
+                       groupByColumns: Seq[String] = Seq.empty,
+                       orderByColumns: Seq[(String, Boolean)] = Seq.empty,
+                       filterOutQuery: Option[Query] = None,
+                       filterOutFields: Seq[String] = Seq(LabelMeta.to.name),
+                       withScore: Boolean = true,
+                       returnTree: Boolean = false,
+                       limitOpt: Option[Int] = None,
+                       returnAgg: Boolean = true,
+                       scoreThreshold: Double = Double.MinValue,
+                       returnDegree: Boolean = true)
+
+
+case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
+                 steps: IndexedSeq[Step] = Vector.empty[Step],
+                 queryOption: QueryOption = QueryOption(),
+                 jsonQuery: JsValue = JsNull) {
+
+  val removeCycle = queryOption.removeCycle
+  val selectColumns = queryOption.selectColumns
+//  val groupBy = queryOption.groupBy
+  val groupByColumns = queryOption.groupByColumns
+  val orderByColumns = queryOption.orderByColumns
+  val filterOutQuery = queryOption.filterOutQuery
+  val filterOutFields = queryOption.filterOutFields
+  val withScore = queryOption.withScore
+  val returnTree = queryOption.returnTree
+  val limitOpt = queryOption.limitOpt
+  val returnAgg = queryOption.returnAgg
+  val returnDegree = queryOption.returnDegree
+
+  def cacheKeyBytes: Array[Byte] = {
+    val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString)
+    val groupBytes = Bytes.toBytes(queryOption.groupByColumns.toString)
+    val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString)
+    val filterOutBytes = 
queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte])
+    val returnTreeBytes = Bytes.toBytes(queryOption.returnTree)
+
+    Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, 
returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add)
+  }
+
+  lazy val selectColumnsSet = queryOption.selectColumns.map { c =>
+    if (c == "_from") "from"
+    else if (c == "_to") "to"
+    else c
+  }.toSet
+
+  /** return logical query id without considering parameter values */
+  def templateId(): JsValue = {
+    Json.toJson(for {
+      step <- steps
+      queryParam <- step.queryParams.sortBy(_.labelWithDir.labelId)
+    } yield {
+        Json.obj("label" -> queryParam.label.label, "direction" -> 
GraphUtil.fromDirection(queryParam.labelWithDir.dir))
+      })
+  }
+
+  def impressionId(): JsNumber = {
+    val hash = MurmurHash3.stringHash(templateId().toString())
+    JsNumber(hash)
+  }
+
+  def cursorStrings(): Seq[Seq[String]] = {
+    //Don`t know how to replace all cursor keys in json
+    steps.map { step =>
+      step.queryParams.map { queryParam =>
+        queryParam.cursorOpt.getOrElse("")
+      }
+    }
+  }
+}
+
+object EdgeTransformer {
+  val DefaultTransformField = Json.arr("_to")
+  val DefaultTransformFieldAsList = Json.arr("_to").as[List[String]]
+  val DefaultJson = Json.arr(DefaultTransformField)
+}
+
+/**
+ * TODO: step wise outputFields should be used with nextStepLimit, 
nextStepThreshold.
+ * @param jsValue
+ */
+case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) {
+  val Delimiter = "\\$"
+  val targets = jsValue.asOpt[List[Vector[String]]].toList
+  val fieldsLs = for {
+    target <- targets
+    fields <- target
+  } yield fields
+  val isDefault = fieldsLs.size == 1 && fieldsLs.head.size == 1 && 
(fieldsLs.head.head == "_to" || fieldsLs.head.head == "to")
+
+  def toHashKeyBytes: Array[Byte] = if (isDefault) Array.empty[Byte] else 
Bytes.toBytes(jsValue.toString)
+
+  def replace(fmt: String,
+              values: Seq[InnerValLike],
+              nextStepOpt: Option[Step]): Seq[InnerValLike] = {
+
+    val tokens = fmt.split(Delimiter)
+    val _values = values.padTo(tokens.length, InnerVal.withStr("", 
queryParam.label.schemaVersion))
+    val mergedStr = tokens.zip(_values).map { case (prefix, innerVal) => 
prefix + innerVal.toString }.mkString
+    //    logger.error(s"${tokens.toList}, ${values}, $mergedStr")
+    //    println(s"${tokens.toList}, ${values}, $mergedStr")
+    nextStepOpt match {
+      case None =>
+        val columnType =
+          if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) 
queryParam.label.tgtColumnType
+          else queryParam.label.srcColumnType
+
+        if (columnType == InnerVal.STRING) Seq(InnerVal.withStr(mergedStr, 
queryParam.label.schemaVersion))
+        else Nil
+      case Some(nextStep) =>
+        val nextQueryParamsValid = nextStep.queryParams.filter { qParam =>
+          if (qParam.labelWithDir.dir == GraphUtil.directions("out")) 
qParam.label.srcColumnType == "string"
+          else qParam.label.tgtColumnType == "string"
+        }
+        for {
+          nextQueryParam <- nextQueryParamsValid
+        } yield {
+          InnerVal.withStr(mergedStr, nextQueryParam.label.schemaVersion)
+        }
+    }
+  }
+
+  def toInnerValOpt(edge: Edge, fieldName: String): Option[InnerValLike] = {
+    fieldName match {
+      case LabelMeta.to.name => Option(edge.tgtVertex.innerId)
+      case LabelMeta.from.name => Option(edge.srcVertex.innerId)
+      case _ =>
+        for {
+          labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName)
+          value <- edge.propsWithTs.get(labelMeta.seq)
+        } yield value.innerVal
+    }
+  }
+
+  def transform(edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
+    if (isDefault) Seq(edge)
+    else {
+      val edges = for {
+        fields <- fieldsLs
+        innerVal <- {
+          if (fields.size == 1) {
+            val fieldName = fields.head
+            toInnerValOpt(edge, fieldName).toSeq
+          } else {
+            val fmt +: fieldNames = fields
+            replace(fmt, fieldNames.flatMap(fieldName => toInnerValOpt(edge, 
fieldName)), nextStepOpt)
+          }
+        }
+      } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = 
Option(edge))
+
+
+      edges
+    }
+  }
+}
+
+object Step {
+  val Delimiter = "|"
+}
+
+case class Step(queryParams: List[QueryParam],
+                labelWeights: Map[Int, Double] = Map.empty,
+                //                scoreThreshold: Double = 0.0,
+                nextStepScoreThreshold: Double = 0.0,
+                nextStepLimit: Int = -1,
+                cacheTTL: Long = -1) {
+
+  lazy val excludes = queryParams.filter(_.exclude)
+  lazy val includes = queryParams.filterNot(_.exclude)
+  lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
+
+  def toCacheKey(lss: Seq[Long]): Long = 
Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong()
+//    MurmurHash3.bytesHash(toCacheKeyRaw(lss))
+
+  def toCacheKeyRaw(lss: Seq[Long]): Array[Byte] = {
+    var bytes = Array.empty[Byte]
+    lss.sorted.foreach { h => bytes = Bytes.add(bytes, Bytes.toBytes(h)) }
+    bytes
+  }
+}
+
+case class VertexParam(vertices: Seq[Vertex]) {
+  var filters: Option[Map[Byte, InnerValLike]] = None
+
+  def has(what: Option[Map[Byte, InnerValLike]]): VertexParam = {
+    what match {
+      case None => this
+      case Some(w) => has(w)
+    }
+  }
+
+  def has(what: Map[Byte, InnerValLike]): VertexParam = {
+    this.filters = Some(what)
+    this
+  }
+
+}
+
+//object RankParam {
+//  def apply(labelId: Int, keyAndWeights: Seq[(Byte, Double)]) = {
+//    new RankParam(labelId, keyAndWeights)
+//  }
+//}
+
+case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = 
Seq.empty[(Byte, Double)]) {
+  // empty => Count
+  lazy val rankKeysWeightsMap = keySeqAndWeights.toMap
+
+  def defaultKey() = {
+    this.keySeqAndWeights = List((LabelMeta.countSeq, 1.0))
+    this
+  }
+
+  def toHashKeyBytes(): Array[Byte] = {
+    var bytes = Array.empty[Byte]
+    keySeqAndWeights.map { case (key, weight) =>
+      bytes = Bytes.add(bytes, Array.fill(1)(key), Bytes.toBytes(weight))
+    }
+    bytes
+  }
+}
+
+object QueryParam {
+  lazy val Empty = QueryParam(LabelWithDirection(0, 0))
+  lazy val DefaultThreshold = Double.MinValue
+  val Delimiter = ","
+}
+
+case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = 
System.currentTimeMillis()) {
+
+  import HBaseSerializable._
+  import Query.DuplicatePolicy
+  import Query.DuplicatePolicy._
+
+  lazy val label = Label.findById(labelWithDir.labelId)
+  val DefaultKey = LabelIndex.DefaultSeq
+  val fullKey = DefaultKey
+
+  var labelOrderSeq = fullKey
+
+  var sample = -1
+  var limit = 10
+  var offset = 0
+  var rank = new RankParam(labelWithDir.labelId, List(LabelMeta.countSeq -> 1))
+
+  var duration: Option[(Long, Long)] = None
+  var isInverted: Boolean = false
+
+  var columnRangeFilter: ColumnRangeFilter = null
+
+  var hasFilters: Map[Byte, InnerValLike] = Map.empty[Byte, InnerValLike]
+  var where: Try[Where] = Success(WhereParser.success)
+  var whereRawOpt: Option[String] = None
+  var duplicatePolicy = DuplicatePolicy.First
+  var rpcTimeoutInMillis = 1000
+  var maxAttempt = 2
+  var includeDegree = false
+  var tgtVertexInnerIdOpt: Option[InnerValLike] = None
+  var cacheTTLInMillis: Long = -1L
+  var threshold = QueryParam.DefaultThreshold
+  var timeDecay: Option[TimeDecay] = None
+  var transformer: EdgeTransformer = EdgeTransformer(this, 
EdgeTransformer.DefaultJson)
+  var scorePropagateOp: String = "multiply"
+  var exclude = false
+  var include = false
+  var shouldNormalize= false
+  var cursorOpt: Option[String] = None
+
+  var columnRangeFilterMinBytes = Array.empty[Byte]
+  var columnRangeFilterMaxBytes = Array.empty[Byte]
+
+  lazy val srcColumnWithDir = label.srcColumnWithDir(labelWithDir.dir)
+  lazy val tgtColumnWithDir = label.tgtColumnWithDir(labelWithDir.dir)
+
+  def toBytes(idxSeq: Byte, offset: Int, limit: Int, isInverted: Boolean): 
Array[Byte] = {
+    val front = Array[Byte](idxSeq, if (isInverted) 1.toByte else 0.toByte)
+    Bytes.add(front, Bytes.toBytes((offset.toLong << 32 | limit)))
+  }
+
+  /**
+   * consider only I/O specific parameters.
+   * properties that is used on Graph.filterEdges should not be considered.
+   * @param bytes
+   * @return
+   */
+  def toCacheKey(bytes: Array[Byte]): Long = {
+    val hashBytes = toCacheKeyRaw(bytes)
+    Hashing.murmur3_128().hashBytes(hashBytes).asLong()
+//    MurmurHash3.bytesHash(hashBytes)
+  }
+
+  def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = {
+    val transformBytes = transformer.toHashKeyBytes
+    //TODO: change this to binrary format.
+    val whereBytes = Bytes.toBytes(whereRawOpt.getOrElse(""))
+    val durationBytes = duration.map { case (min, max) =>
+      val minTs = min / cacheTTLInMillis
+      val maxTs = max / cacheTTLInMillis
+      Bytes.add(Bytes.toBytes(minTs), Bytes.toBytes(maxTs))
+    } getOrElse Array.empty[Byte]
+//    Bytes.toBytes(duration.toString)
+    val conditionBytes = Bytes.add(transformBytes, whereBytes, durationBytes)
+    Bytes.add(Bytes.add(bytes, labelWithDir.bytes, toBytes(labelOrderSeq, 
offset, limit, isInverted)), rank.toHashKeyBytes(),
+      Bytes.add(columnRangeFilterMinBytes, columnRangeFilterMaxBytes, 
conditionBytes))
+  }
+
+  def isInverted(isInverted: Boolean): QueryParam = {
+    this.isInverted = isInverted
+    this
+  }
+
+  def labelOrderSeq(labelOrderSeq: Byte): QueryParam = {
+    this.labelOrderSeq = labelOrderSeq
+    this
+  }
+
+  def sample(n: Int): QueryParam = {
+    this.sample = n
+    this
+  }
+
+  def limit(offset: Int, limit: Int): QueryParam = {
+    /** since degree info is located on first always */
+    if (offset == 0 && this.columnRangeFilter == null) {
+      this.limit = limit + 1
+      this.offset = offset
+    } else {
+      this.limit = limit
+      this.offset = offset + 1
+    }
+    //    this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, 
this.offset)
+    this
+  }
+
+  def interval(fromTo: Option[(Seq[(Byte, InnerValLike)], Seq[(Byte, 
InnerValLike)])]): QueryParam = {
+    fromTo match {
+      case Some((from, to)) => interval(from, to)
+      case _ => this
+    }
+  }
+
+  def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, 
InnerValLike)]): QueryParam = {
+    //    val len = label.orderTypes.size.toByte
+    //    val len = 
label.extraIndicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+    //    logger.error(s"indicesMap: ${label.indicesMap(labelOrderSeq)}")
+    val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+
+    val minMetaByte = InnerVal.minMetaByte
+    //    val maxMetaByte = InnerVal.maxMetaByte
+    val maxMetaByte = -1.toByte
+    val toVal = Bytes.add(propsToBytes(to), Array.fill(1)(minMetaByte))
+    //FIXME
+    val fromVal = Bytes.add(propsToBytes(from), Array.fill(10)(maxMetaByte))
+    toVal(0) = len
+    fromVal(0) = len
+    val maxBytes = fromVal
+    val minBytes = toVal
+    this.columnRangeFilterMaxBytes = maxBytes
+    this.columnRangeFilterMinBytes = minBytes
+    val rangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, true)
+    this.columnRangeFilter = rangeFilter
+    this
+  }
+
+  def duration(minMaxTs: Option[(Long, Long)]): QueryParam = {
+    minMaxTs match {
+      case Some((minTs, maxTs)) => duration(minTs, maxTs)
+      case _ => this
+    }
+  }
+
+  def duration(minTs: Long, maxTs: Long): QueryParam = {
+    this.duration = Some((minTs, maxTs))
+    this
+  }
+
+  def rank(r: RankParam): QueryParam = {
+    this.rank = r
+    this
+  }
+
+  def exclude(filterOut: Boolean): QueryParam = {
+    this.exclude = filterOut
+    this
+  }
+
+  def include(filterIn: Boolean): QueryParam = {
+    this.include = filterIn
+    this
+  }
+
+  def has(hasFilters: Map[Byte, InnerValLike]): QueryParam = {
+    this.hasFilters = hasFilters
+    this
+  }
+
+  def where(whereTry: Try[Where]): QueryParam = {
+    this.where = whereTry
+    this
+  }
+
+  def duplicatePolicy(policy: Option[DuplicatePolicy]): QueryParam = {
+    this.duplicatePolicy = policy.getOrElse(DuplicatePolicy.First)
+    this
+  }
+
+  def rpcTimeout(millis: Int): QueryParam = {
+    this.rpcTimeoutInMillis = millis
+    this
+  }
+
+  def maxAttempt(attempt: Int): QueryParam = {
+    this.maxAttempt = attempt
+    this
+  }
+
+  def includeDegree(includeDegree: Boolean): QueryParam = {
+    this.includeDegree = includeDegree
+    this
+  }
+
+  def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = {
+    this.tgtVertexInnerIdOpt = other
+    this
+  }
+
+  def cacheTTLInMillis(other: Long): QueryParam = {
+    this.cacheTTLInMillis = other
+    this
+  }
+
+  def timeDecay(other: Option[TimeDecay]): QueryParam = {
+    this.timeDecay = other
+    this
+  }
+
+  def threshold(other: Double): QueryParam = {
+    this.threshold = other
+    this
+  }
+
+  def transformer(other: Option[JsValue]): QueryParam = {
+    other match {
+      case Some(js) => this.transformer = EdgeTransformer(this, js)
+      case None =>
+    }
+    this
+  }
+
+  def scorePropagateOp(scorePropagateOp: String): QueryParam = {
+    this.scorePropagateOp = scorePropagateOp
+    this
+  }
+
+  def shouldNormalize(shouldNormalize: Boolean): QueryParam = {
+    this.shouldNormalize = shouldNormalize
+    this
+  }
+
+  def whereRawOpt(sqlOpt: Option[String]): QueryParam = {
+    this.whereRawOpt = sqlOpt
+    this
+  }
+
+  def cursorOpt(cursorOpt: Option[String]): QueryParam = {
+    this.cursorOpt = cursorOpt
+    this
+  }
+
+  def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined
+
+  override def toString = {
+    List(label.label, labelOrderSeq, offset, limit, rank,
+      duration, isInverted, exclude, include, hasFilters).mkString("\t")
+    //      duration, isInverted, exclude, include, hasFilters, 
outputFields).mkString("\t")
+  }
+
+  //
+  //  def buildGetRequest(srcVertex: Vertex) = {
+  //    val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
+  //    val (srcInnerId, tgtInnerId) = tgtVertexInnerIdOpt match {
+  //      case Some(tgtVertexInnerId) => // _to is given.
+  //        /** we use toInvertedEdgeHashLike so dont need to swap src, tgt */
+  //        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
+  //        val tgt = InnerVal.convertVersion(tgtVertexInnerId, 
tgtColumn.columnType, label.schemaVersion)
+  //        (src, tgt)
+  //      case None =>
+  //        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
+  //        (src, src)
+  //    }
+  //
+  //    val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), 
TargetVertexId(tgtColumn.id.get, tgtInnerId))
+  //    val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
+  //    val edge = Edge(srcV, tgtV, labelWithDir)
+  //
+  //    val get = if (tgtVertexInnerIdOpt.isDefined) {
+  //      val snapshotEdge = edge.toInvertedEdgeHashLike
+  //      val kv = snapshotEdge.kvs.head
+  //      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, 
kv.qualifier)
+  //    } else {
+  //      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq 
== labelOrderSeq)
+  //      assert(indexedEdgeOpt.isDefined)
+  //      val indexedEdge = indexedEdgeOpt.get
+  //      val kv = indexedEdge.kvs.head
+  //      val table = label.hbaseTableName.getBytes
+  //        //kv.table //
+  //      val rowKey = kv.row // indexedEdge.rowKey.bytes
+  //      val cf = edgeCf
+  //      new GetRequest(table, rowKey, cf)
+  //    }
+  //
+  //    val (minTs, maxTs) = duration.getOrElse((0L, Long.MaxValue))
+  //
+  //    get.maxVersions(1)
+  //    get.setFailfast(true)
+  //    get.setMaxResultsPerColumnFamily(limit)
+  //    get.setRowOffsetPerColumnFamily(offset)
+  //    get.setMinTimestamp(minTs)
+  //    get.setMaxTimestamp(maxTs)
+  //    get.setTimeout(rpcTimeoutInMillis)
+  //    if (columnRangeFilter != null) get.setFilter(columnRangeFilter)
+  //    //    get.setMaxAttempt(maxAttempt.toByte)
+  //    //    get.setRpcTimeout(rpcTimeoutInMillis)
+  //
+  //    //    if (columnRangeFilter != null) get.filter(columnRangeFilter)
+  //    //    logger.debug(s"Get: $get, $offset, $limit")
+  //
+  //    get
+  //  }
+}
+
+case class TimeDecay(initial: Double = 1.0,
+                     lambda: Double = 0.1,
+                     timeUnit: Double = 60 * 60 * 24,
+                     labelMetaSeq: Byte = LabelMeta.timeStampSeq) {
+  def decay(diff: Double): Double = {
+    //FIXME
+    val ret = initial * Math.pow(1.0 - lambda, diff / timeUnit)
+    //    logger.debug(s"$initial, $lambda, $timeUnit, $diff, ${diff / 
timeUnit}, $ret")
+    ret
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
new file mode 100644
index 0000000..550c4c9
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -0,0 +1,42 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
+
+import scala.collection.Seq
+
+object QueryResult {
+  def fromVertices(query: Query): Seq[QueryRequestWithResult] = {
+    if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
+      Seq.empty
+    } else {
+      val queryParam = query.steps.head.queryParams.head
+      val label = queryParam.label
+      val currentTs = System.currentTimeMillis()
+      val propsWithTs = Map(LabelMeta.timeStampSeq ->
+        InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs))
+      for {
+        vertex <- query.vertices
+      } yield {
+        val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs = 
propsWithTs)
+        val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
+        QueryRequestWithResult(QueryRequest(query, -1, vertex, queryParam),
+          QueryResult(edgeWithScoreLs = Seq(edgeWithScore)))
+      }
+    }
+  }
+}
+case class QueryRequestWithResult(queryRequest: QueryRequest, queryResult: 
QueryResult)
+
+case class QueryRequest(query: Query,
+                        stepIdx: Int,
+                        vertex: Vertex,
+                        queryParam: QueryParam)
+
+
+case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
+                       tailCursor: Array[Byte] = Array.empty,
+                       timestamp: Long = System.currentTimeMillis(),
+                       isFailure: Boolean = false)
+
+case class EdgeWithScore(edge: Edge, score: Double)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
new file mode 100644
index 0000000..f4f40b7
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -0,0 +1,101 @@
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, 
VertexId}
+import play.api.libs.json.Json
+
+case class Vertex(id: VertexId,
+                  ts: Long = System.currentTimeMillis(),
+                  props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
+                  op: Byte = 0,
+                  belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement {
+
+  val innerId = id.innerId
+
+  def schemaVer = serviceColumn.schemaVersion
+
+  def serviceColumn = ServiceColumn.findById(id.colId)
+
+  def service = Service.findById(serviceColumn.serviceId)
+
+  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
service.hTableName)
+
+  def defaultProps = Map(ColumnMeta.lastModifiedAtColumnSeq.toInt -> 
InnerVal.withLong(ts, schemaVer))
+
+  //  lazy val kvs = Graph.client.vertexSerializer(this).toKeyValues
+
+  /** TODO: make this as configurable */
+  override def serviceName = service.serviceName
+
+  override def isAsync = false
+
+  override def queueKey = Seq(ts.toString, serviceName).mkString("|")
+
+  override def queuePartitionKey = id.innerId.toString
+
+  def propsWithName = for {
+    (seq, v) <- props
+    meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte)
+  } yield (meta.name -> v.toString)
+
+  //  /** only used by bulk loader */
+  //  def buildPuts(): List[Put] = {
+  //    //    logger.error(s"put: $this => $rowKey")
+  ////    val put = new Put(rowKey.bytes)
+  ////    for ((q, v) <- qualifiersWithValues) {
+  ////      put.addColumn(vertexCf, q, ts, v)
+  ////    }
+  ////    List(put)
+  //    val kv = kvs.head
+  //    val put = new Put(kv.row)
+  //    kvs.map { kv =>
+  //      put.addColumn(kv.cf, kv.qualifier, kv.timestamp, kv.value)
+  //    }
+  //    List(put)
+  //  }
+
+  def toEdgeVertex() = Vertex(SourceVertexId(id.colId, innerId), ts, props, op)
+
+
+  override def hashCode() = {
+    val hash = id.hashCode()
+    //    logger.debug(s"Vertex.hashCode: $this -> $hash")
+    hash
+  }
+
+  override def equals(obj: Any) = {
+    obj match {
+      case otherVertex: Vertex =>
+        val ret = id == otherVertex.id
+        //        logger.debug(s"Vertex.equals: $this, $obj => $ret")
+        ret
+      case _ => false
+    }
+  }
+
+  def withProps(newProps: Map[Int, InnerValLike]) = Vertex(id, ts, newProps, 
op)
+
+  def toLogString(): String = {
+    val (serviceName, columnName) =
+      if (!id.storeColId) ("", "")
+      else (serviceColumn.service.serviceName, serviceColumn.columnName)
+
+    if (propsWithName.nonEmpty)
+      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName, 
Json.toJson(propsWithName)).mkString("\t")
+    else
+      Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, 
columnName).mkString("\t")
+  }
+}
+
+object Vertex {
+
+  def toPropKey(labelId: Int): Int = Byte.MaxValue + labelId
+
+  def toLabelId(propKey: Int): Int = propKey - Byte.MaxValue
+
+  def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
+
+  //  val emptyVertex = Vertex(new CompositeId(CompositeId.defaultColId, 
CompositeId.defaultInnerId, false, true),
+  //    System.currentTimeMillis())
+  def fromString(s: String): Option[Vertex] = Graph.toVertex(s)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
new file mode 100644
index 0000000..c92c2a2
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
@@ -0,0 +1,66 @@
+package org.apache.s2graph.core.mysqls
+
+/**
+ * Created by shon on 8/5/15.
+ */
+
+import scalikejdbc._
+
+object Bucket extends Model[Bucket] {
+
+  val rangeDelimiter = "~"
+  val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
+
+  def apply(rs: WrappedResultSet): Bucket = {
+    Bucket(rs.intOpt("id"),
+      rs.int("experiment_id"),
+      rs.string("modular"),
+      rs.string("http_verb"),
+      rs.string("api_path"),
+      rs.string("request_body"),
+      rs.int("timeout"),
+      rs.string("impression_id"),
+      rs.boolean("is_graph_query"),
+      rs.boolean("is_empty"))
+  }
+
+  def finds(experimentId: Int)(implicit session: DBSession = AutoSession): 
List[Bucket] = {
+    val cacheKey = "experimentId=" + experimentId
+    withCaches(cacheKey) {
+      sql"""select * from buckets where experiment_id = $experimentId"""
+        .map { rs => Bucket(rs) }.list().apply()
+    }
+  }
+
+  def toRange(str: String): Option[(Int, Int)] = {
+    val range = str.split(rangeDelimiter)
+    if (range.length == 2) Option(range.head.toInt, range.last.toInt)
+    else None
+  }
+
+  def findByImpressionId(impressionId: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
+    val cacheKey = "impressionId=" + impressionId
+    val sql = sql"""select * from buckets where impression_id=$impressionId"""
+      .map { rs => Bucket(rs)}
+    if (useCache) {
+      withCache(cacheKey) {
+        sql.single().apply()
+      }
+    } else {
+      sql.single().apply()
+    }
+  }
+}
+
+case class Bucket(id: Option[Int],
+                  experimentId: Int,
+                  modular: String,
+                  httpVerb: String, apiPath: String,
+                  requestBody: String, timeout: Int, impressionId: String,
+                  isGraphQuery: Boolean = true,
+                  isEmpty: Boolean = false) {
+
+  import Bucket._
+
+  lazy val rangeOpt = toRange(modular)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
new file mode 100644
index 0000000..a726f40
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
@@ -0,0 +1,110 @@
+package org.apache.s2graph.core.mysqls
+
+import play.api.libs.json.Json
+import scalikejdbc._
+
+object ColumnMeta extends Model[ColumnMeta] {
+
+  val timeStampSeq = 0.toByte
+  val countSeq = -1.toByte
+  val lastModifiedAtColumnSeq = 0.toByte
+  val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", 
lastModifiedAtColumnSeq, "long")
+  val maxValue = Byte.MaxValue
+
+  def apply(rs: WrappedResultSet): ColumnMeta = {
+    ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), 
rs.byte("seq"), rs.string("data_type").toLowerCase())
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
+    //    val cacheKey = s"id=$id"
+    val cacheKey = "id=" + id
+    withCache(cacheKey) {
+      sql"""select * from column_metas where id = ${id}""".map { rs => 
ColumnMeta(rs) }.single.apply
+    }.get
+  }
+
+  def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession) = {
+    //    val cacheKey = s"columnId=$columnId"
+    val cacheKey = "columnId=" + columnId
+    if (useCache) {
+      withCaches(cacheKey)( sql"""select *from column_metas where column_id = 
${columnId} order by seq ASC"""
+        .map { rs => ColumnMeta(rs) }.list.apply())
+    } else {
+      sql"""select * from column_metas where column_id = ${columnId} order by 
seq ASC"""
+        .map { rs => ColumnMeta(rs) }.list.apply()
+    }
+  }
+
+  def findByName(columnId: Int, name: String)(implicit session: DBSession = 
AutoSession) = {
+    //    val cacheKey = s"columnId=$columnId:name=$name"
+    val cacheKey = "columnId=" + columnId + ":name=" + name
+    withCache(cacheKey)( sql"""select * from column_metas where column_id = 
${columnId} and name = ${name}"""
+      .map { rs => ColumnMeta(rs) }.single.apply())
+  }
+
+  def insert(columnId: Int, name: String, dataType: String)(implicit session: 
DBSession = AutoSession) = {
+    val ls = findAllByColumn(columnId, false)
+    val seq = ls.size + 1
+    if (seq <= maxValue) {
+      sql"""insert into column_metas(column_id, name, seq, data_type)
+    select ${columnId}, ${name}, ${seq}, ${dataType}"""
+        .updateAndReturnGeneratedKey.apply()
+    }
+  }
+
+  def findOrInsert(columnId: Int, name: String, dataType: String)(implicit 
session: DBSession = AutoSession): ColumnMeta = {
+    findByName(columnId, name) match {
+      case Some(c) => c
+      case None =>
+        insert(columnId, name, dataType)
+        expireCache(s"columnId=$columnId:name=$name")
+        findByName(columnId, name).get
+    }
+  }
+
+  def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession) = {
+    val cacheKey = "columnId=" + columnId + ":seq=" + seq
+    lazy val columnMetaOpt = sql"""
+        select * from column_metas where column_id = ${columnId} and seq = 
${seq}
+    """.map { rs => ColumnMeta(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(columnMetaOpt)
+    else columnMetaOpt
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val columnMeta = findById(id)
+    val (columnId, name) = (columnMeta.columnId, columnMeta.name)
+    sql"""delete from column_metas where id = ${id}""".execute.apply()
+    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", 
s"colunmId=$columnId")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from column_metas""".map { rs => ColumnMeta(rs) 
}.list().apply()
+
+    putsToCache(ls.map { x =>
+      val cacheKey = s"id=${x.id.get}"
+      (cacheKey -> x)
+    })
+    putsToCache(ls.map { x =>
+      val cacheKey = s"columnId=${x.columnId}:name=${x.name}"
+      (cacheKey -> x)
+    })
+    putsToCache(ls.map { x =>
+      val cacheKey = s"columnId=${x.columnId}:seq=${x.seq}"
+      (cacheKey -> x)
+    })
+    putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) =>
+      val cacheKey = s"columnId=${columnId}"
+      (cacheKey -> ls)
+    }.toList)
+  }
+}
+
+case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, 
dataType: String) {
+  lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
new file mode 100644
index 0000000..3b5fefb
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
@@ -0,0 +1,77 @@
+package org.apache.s2graph.core.mysqls
+
+import org.apache.s2graph.core.GraphUtil
+import scalikejdbc._
+
+import scala.util.Random
+
+object Experiment extends Model[Experiment] {
+  val impressionKey = "S2-Impression-Id"
+
+  def apply(rs: WrappedResultSet): Experiment = {
+    Experiment(rs.intOpt("id"),
+      rs.int("service_id"),
+      rs.string("name"),
+      rs.string("description"),
+      rs.string("experiment_type"),
+      rs.int("total_modular"))
+  }
+
+  def finds(serviceId: Int)(implicit session: DBSession = AutoSession): 
List[Experiment] = {
+    val cacheKey = "serviceId=" + serviceId
+    withCaches(cacheKey) {
+      sql"""select * from experiments where service_id = ${serviceId}"""
+        .map { rs => Experiment(rs) }.list().apply()
+    }
+  }
+
+  def findBy(serviceId: Int, name: String)(implicit session: DBSession = 
AutoSession): Option[Experiment] = {
+    val cacheKey = "serviceId=" + serviceId + ":name=" + name
+    withCache(cacheKey) {
+      sql"""select * from experiments where service_id = ${serviceId} and name 
= ${name}"""
+        .map { rs => Experiment(rs) }.single.apply
+    }
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): 
Option[Experiment] = {
+    val cacheKey = "id=" + id
+    withCache(cacheKey)(
+      sql"""select * from experiments where id = ${id}"""
+        .map { rs => Experiment(rs) }.single.apply
+    )
+  }
+}
+
+case class Experiment(id: Option[Int],
+                      serviceId: Int,
+                      name: String,
+                      description: String,
+                      experimentType: String,
+                      totalModular: Int) {
+
+  def buckets = Bucket.finds(id.get)
+
+  def rangeBuckets = for {
+    bucket <- buckets
+    range <- bucket.rangeOpt
+  } yield range -> bucket
+
+
+  def findBucket(uuid: String, impIdOpt: Option[String] = None): 
Option[Bucket] = {
+    impIdOpt match {
+      case Some(impId) => Bucket.findByImpressionId(impId)
+      case None =>
+        val seed = experimentType match {
+          case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1
+          case _ => Random.nextInt(totalModular) + 1
+        }
+        findBucket(seed)
+    }
+  }
+
+  def findBucket(uuidMod: Int): Option[Bucket] = {
+    rangeBuckets.find { case ((from, to), bucket) =>
+      from <= uuidMod && uuidMod <= to
+    }.map(_._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
new file mode 100644
index 0000000..0958c2c
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -0,0 +1,372 @@
+package org.apache.s2graph.core.mysqls
+
+import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{GraphExceptions, GraphUtil, JSONParser}
+import play.api.libs.json.Json
+import scalikejdbc._
+
+object Label extends Model[Label] {
+
+  val maxHBaseTableNames = 2
+
+  def apply(rs: WrappedResultSet): Label = {
+    Label(Option(rs.int("id")), rs.string("label"),
+      rs.int("src_service_id"), rs.string("src_column_name"), 
rs.string("src_column_type"),
+      rs.int("tgt_service_id"), rs.string("tgt_column_name"), 
rs.string("tgt_column_type"),
+      rs.boolean("is_directed"), rs.string("service_name"), 
rs.int("service_id"), rs.string("consistency_level"),
+      rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), 
rs.string("schema_version"), rs.boolean("is_async"), 
rs.string("compressionAlgorithm"))
+  }
+
+  def deleteAll(label: Label)(implicit session: DBSession) = {
+    val id = label.id
+    LabelMeta.findAllByLabelId(id.get, false).foreach { x => 
LabelMeta.delete(x.id.get) }
+    LabelIndex.findByLabelIdAll(id.get, false).foreach { x => 
LabelIndex.delete(x.id.get) }
+    Label.delete(id.get)
+  }
+
+  def findByName(labelName: String, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): Option[Label] = {
+    val cacheKey = "label=" + labelName
+    lazy val labelOpt =
+      sql"""
+        select *
+        from labels
+        where label = ${labelName}""".map { rs => Label(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(labelOpt)
+    else labelOpt
+  }
+
+  def insert(label: String,
+             srcServiceId: Int,
+             srcColumnName: String,
+             srcColumnType: String,
+             tgtServiceId: Int,
+             tgtColumnName: String,
+             tgtColumnType: String,
+             isDirected: Boolean,
+             serviceName: String,
+             serviceId: Int,
+             consistencyLevel: String,
+             hTableName: String,
+             hTableTTL: Option[Int],
+             schemaVersion: String,
+             isAsync: Boolean,
+             compressionAlgorithm: String)(implicit session: DBSession = 
AutoSession) = {
+    sql"""
+       insert into labels(label,
+    src_service_id, src_column_name, src_column_type,
+    tgt_service_id, tgt_column_name, tgt_column_type,
+    is_directed, service_name, service_id, consistency_level, 
hbase_table_name, hbase_table_ttl, schema_version, is_async, 
compressionAlgorithm)
+       values (${label},
+    ${srcServiceId}, ${srcColumnName}, ${srcColumnType},
+    ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
+    ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, 
${hTableName}, ${hTableTTL},
+    ${schemaVersion}, ${isAsync}, ${compressionAlgorithm})
+    """
+      .updateAndReturnGeneratedKey.apply()
+  }
+
+  def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): 
Option[Label] = {
+    val cacheKey = "id=" + id
+    withCache(cacheKey)(
+      sql"""
+        select         *
+        from   labels
+        where  id = ${id}"""
+        .map { rs => Label(rs) }.single.apply())
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
+    val cacheKey = "id=" + id
+    withCache(cacheKey)(
+      sql"""
+        select         *
+        from   labels
+        where  id = ${id}"""
+        .map { rs => Label(rs) }.single.apply()).get
+  }
+
+  def findByTgtColumnId(columnId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = "tgtColumnId=" + columnId
+    val col = ServiceColumn.findById(columnId)
+    withCaches(cacheKey)(
+      sql"""
+          select       *
+          from labels
+          where        tgt_column_name = ${col.columnName}
+          and service_id = ${col.serviceId}
+        """.map { rs => Label(rs) }.list().apply())
+  }
+
+  def findBySrcColumnId(columnId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = "srcColumnId=" + columnId
+    val col = ServiceColumn.findById(columnId)
+    withCaches(cacheKey)(
+      sql"""
+          select       *
+          from labels
+          where        src_column_name = ${col.columnName}
+          and service_id = ${col.serviceId}
+        """.map { rs => Label(rs) }.list().apply())
+  }
+
+  def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = "srcServiceId=" + serviceId
+    withCaches(cacheKey)(
+      sql"""select * from labels where src_service_id = ${serviceId}""".map { 
rs => Label(rs) }.list().apply
+    )
+  }
+
+  def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
+    val cacheKey = "tgtServiceId=" + serviceId
+    withCaches(cacheKey)(
+      sql"""select * from labels where tgt_service_id = ${serviceId}""".map { 
rs => Label(rs) }.list().apply
+    )
+  }
+
+  def insertAll(labelName: String, srcServiceName: String, srcColumnName: 
String, srcColumnType: String,
+                tgtServiceName: String, tgtColumnName: String, tgtColumnType: 
String,
+                isDirected: Boolean = true,
+                serviceName: String,
+                indices: Seq[Index],
+                metaProps: Seq[Prop],
+                consistencyLevel: String,
+                hTableName: Option[String],
+                hTableTTL: Option[Int],
+                schemaVersion: String,
+                isAsync: Boolean,
+                compressionAlgorithm: String)(implicit session: DBSession = 
AutoSession): Label = {
+
+    val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
+    val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
+    val serviceOpt = Service.findByName(serviceName, useCache = false)
+    if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service 
$srcServiceName is not created.")
+    if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service 
$tgtServiceName is not created.")
+    if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName 
is not created.")
+
+    val newLabel = for {
+      srcService <- srcServiceOpt
+      tgtService <- tgtServiceOpt
+      service <- serviceOpt
+    } yield {
+        val srcServiceId = srcService.id.get
+        val tgtServiceId = tgtService.id.get
+        val serviceId = service.id.get
+
+        /** insert serviceColumn */
+        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, 
Some(srcColumnType), schemaVersion)
+        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, 
Some(tgtColumnType), schemaVersion)
+
+        if (srcCol.columnType != srcColumnType) throw new 
RuntimeException(s"source service column type not matched ${srcCol.columnType} 
!= ${srcColumnType}")
+        if (tgtCol.columnType != tgtColumnType) throw new 
RuntimeException(s"target service column type not matched ${tgtCol.columnType} 
!= ${tgtColumnType}")
+
+        /** create label */
+        Label.findByName(labelName, useCache = false).getOrElse {
+
+          val createdId = insert(labelName, srcServiceId, srcColumnName, 
srcColumnType,
+            tgtServiceId, tgtColumnName, tgtColumnType, isDirected, 
serviceName, serviceId, consistencyLevel,
+            hTableName.getOrElse(service.hTableName), 
hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync, 
compressionAlgorithm).toInt
+
+          val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, 
dataType) =>
+            val labelMeta = LabelMeta.findOrInsert(createdId, propName, 
defaultValue, dataType)
+            (propName -> labelMeta.seq)
+          }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name 
-> labelMeta.seq).toMap
+
+          if (indices.isEmpty) {
+            // make default index with _PK, _timestamp, 0
+            LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, 
LabelIndex.DefaultMetaSeqs.toList, "none")
+          } else {
+            indices.foreach { index =>
+              val metaSeq = index.propNames.map { name => labelMetaMap(name) }
+              LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, 
"none")
+            }
+          }
+
+          val cacheKeys = List(s"id=$createdId", s"label=$labelName")
+          val ret = findByName(labelName, useCache = false).get
+          putsToCache(cacheKeys.map(k => k -> ret))
+          ret
+        }
+      }
+
+    newLabel.getOrElse(throw new RuntimeException("failed to create label"))
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from labels""".map { rs => Label(rs) 
}.list().apply()
+    putsToCache(ls.map { x =>
+      val cacheKey = s"id=${x.id.get}"
+      (cacheKey -> x)
+    })
+    putsToCache(ls.map { x =>
+      val cacheKey = s"label=${x.label}"
+      (cacheKey -> x)
+    })
+  }
+
+  def updateName(oldName: String, newName: String)(implicit session: DBSession 
= AutoSession) = {
+    logger.info(s"rename label: $oldName -> $newName")
+    sql"""update labels set label = ${newName} where label = 
${oldName}""".update.apply()
+  }
+
+  def updateHTableName(labelName: String, newHTableName: String)(implicit 
session: DBSession = AutoSession) = {
+    logger.info(s"update HTable of label $labelName to $newHTableName")
+    val cnt = sql"""update labels set hbase_table_name = $newHTableName where 
label = $labelName""".update().apply()
+    val label = Label.findByName(labelName, useCache = false).get
+
+    val cacheKeys = List(s"id=${label.id}", s"label=${label.label}")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+    cnt
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val label = findById(id)
+    logger.info(s"delete label: $label")
+    val cnt = sql"""delete from labels where id = 
${label.id.get}""".update().apply()
+    val cacheKeys = List(s"id=$id", s"label=${label.label}")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+    cnt
+  }
+}
+
+case class Label(id: Option[Int], label: String,
+                 srcServiceId: Int, srcColumnName: String, srcColumnType: 
String,
+                 tgtServiceId: Int, tgtColumnName: String, tgtColumnType: 
String,
+                 isDirected: Boolean = true, serviceName: String, serviceId: 
Int, consistencyLevel: String = "strong",
+                 hTableName: String, hTableTTL: Option[Int],
+                 schemaVersion: String, isAsync: Boolean = false,
+                 compressionAlgorithm: String) extends JSONParser {
+  def metas = LabelMeta.findAllByLabelId(id.get)
+
+  def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
+
+  //  lazy val firstHBaseTableName = 
hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
+  lazy val srcService = Service.findById(srcServiceId)
+  lazy val tgtService = Service.findById(tgtServiceId)
+  lazy val service = Service.findById(serviceId)
+  /**
+   * TODO
+   * change this to apply hbase table from target serviceName
+   */
+  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
+  //  lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, 
hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
+  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name")))
+  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, 
hTableName.split(",").head)
+
+  lazy val srcColumn = ServiceColumn.find(srcServiceId, 
srcColumnName).getOrElse(throw ModelNotFoundException("Source column not 
found"))
+  lazy val tgtColumn = ServiceColumn.find(tgtServiceId, 
tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not 
found"))
+
+  lazy val direction = if (isDirected) "out" else "undirected"
+  lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, 
LabelIndex.DefaultSeq)
+
+  //TODO: Make sure this is correct
+  lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
+  lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
+  lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
+  lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
+  lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && 
idx.id.get != defaultIndex.get.id.get)
+  //      indices filterNot (_.id.get == defaultIndex.get.id.get)
+  lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap
+
+  lazy val metaProps = LabelMeta.reservedMetas.map { m =>
+    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
+    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
+    else m
+  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
+
+  lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m =>
+    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
+    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
+    else m
+  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
+
+  lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap
+  lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap
+  lazy val metaPropNames = metaProps.map(x => x.name)
+  lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap
+
+  /** this is used only by edgeToProps */
+  lazy val metaPropsDefaultMap = (for {
+    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
+    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
+  } yield prop.name -> jsValue).toMap
+
+  lazy val metaPropsDefaultMapInner = (for {
+    prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, 
schemaVersion), prop.dataType)
+  } yield prop.name -> jsValue).toMap
+
+  def srcColumnWithDir(dir: Int) = {
+    if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn
+  }
+
+  def tgtColumnWithDir(dir: Int) = {
+    if (dir == GraphUtil.directions("out")) tgtColumn else srcColumn
+  }
+
+  def srcTgtColumn(dir: Int) =
+    if (isDirected) {
+      (srcColumnWithDir(dir), tgtColumnWithDir(dir))
+    } else {
+      if (dir == GraphUtil.directions("in")) {
+        (tgtColumn, srcColumn)
+      } else {
+        (srcColumn, tgtColumn)
+      }
+    }
+
+  def init() = {
+    metas
+    metaSeqsToNames
+    service
+    srcColumn
+    tgtColumn
+    defaultIndex
+    indices
+    metaProps
+  }
+
+  //  def srcColumnInnerVal(jsValue: JsValue) = {
+  //    jsValueToInnerVal(jsValue, srcColumnType, version)
+  //  }
+  //  def tgtColumnInnerVal(jsValue: JsValue) = {
+  //    jsValueToInnerVal(jsValue, tgtColumnType, version)
+  //  }
+
+  override def toString(): String = {
+    val orderByKeys = LabelMeta.findAllByLabelId(id.get)
+    super.toString() + orderByKeys.toString()
+  }
+
+  //  def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
+  //    if (scoring.isEmpty) LabelIndex.defaultSeq
+  //    else {
+  //      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
+  //
+  ////      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
+  //    }
+  //  }
+  lazy val toJson = Json.obj("labelName" -> label,
+    "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
+    "isDirected" -> isDirected,
+    "serviceName" -> serviceName,
+    "consistencyLevel" -> consistencyLevel,
+    "schemaVersion" -> schemaVersion,
+    "isAsync" -> isAsync,
+    "compressionAlgorithm" -> compressionAlgorithm,
+    "defaultIndex" -> defaultIndex.map(x => x.toJson),
+    "extraIndex" -> extraIndices.map(exIdx => exIdx.toJson),
+    "metaProps" -> metaProps.filter { labelMeta => 
LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson)
+  )
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
new file mode 100644
index 0000000..31e63c3
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
@@ -0,0 +1,143 @@
+package org.apache.s2graph.core.mysqls
+
+/**
+ * Created by shon on 6/3/15.
+ */
+
+import play.api.libs.json.Json
+import scalikejdbc._
+
+object LabelIndex extends Model[LabelIndex] {
+  val DefaultName = "_PK"
+  val DefaultMetaSeqs = Seq(LabelMeta.timeStampSeq)
+  val DefaultSeq = 1.toByte
+  val MaxOrderSeq = 7
+
+  def apply(rs: WrappedResultSet): LabelIndex = {
+    LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"),
+      rs.string("meta_seqs").split(",").filter(_ != "").map(s => 
s.toByte).toList match {
+        case metaSeqsList => metaSeqsList
+      },
+      rs.string("formulars"))
+  }
+
+  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
+    val cacheKey = "id=" + id
+    withCache(cacheKey) {
+      sql"""select * from label_indices where id = ${id}""".map { rs => 
LabelIndex(rs) }.single.apply
+    }.get
+  }
+
+  def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession) = {
+    val cacheKey = "labelId=" + labelId
+    if (useCache) {
+      withCaches(cacheKey)( sql"""
+        select * from label_indices where label_id = ${labelId} and seq > 0 
order by seq ASC
+      """.map { rs => LabelIndex(rs) }.list.apply)
+    } else {
+      sql"""
+        select * from label_indices where label_id = ${labelId} and seq > 0 
order by seq ASC
+      """.map { rs => LabelIndex(rs) }.list.apply
+    }
+  }
+
+  def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], 
formulars: String)(implicit session: DBSession = AutoSession): Long = {
+    sql"""
+       insert into label_indices(label_id, name, seq, meta_seqs, formulars)
+       values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, 
${formulars})
+    """
+      .updateAndReturnGeneratedKey.apply()
+  }
+
+  def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], 
formulars: String)(implicit session: DBSession = AutoSession): LabelIndex = {
+    findByLabelIdAndSeqs(labelId, metaSeqs) match {
+      case Some(s) => s
+      case None =>
+        val orders = findByLabelIdAll(labelId, false)
+        val seq = (orders.size + 1).toByte
+        assert(seq <= MaxOrderSeq)
+        val createdId = insert(labelId, indexName, seq, metaSeqs, formulars)
+        val cacheKeys = List(s"labelId=$labelId:seq=$seq",
+          s"labelId=$labelId:seqs=$metaSeqs", s"labelId=$labelId:seq=$seq", 
s"id=$createdId")
+        cacheKeys.foreach { key =>
+          expireCache(key)
+          expireCaches(key)
+        }
+
+        findByLabelIdAndSeq(labelId, seq).get
+    }
+  }
+
+  def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte])(implicit session: 
DBSession = AutoSession): Option[LabelIndex] = {
+    val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",")
+    withCache(cacheKey) {
+      sql"""
+      select * from label_indices where label_id = ${labelId} and meta_seqs = 
${seqs.mkString(",")}
+      """.map { rs => LabelIndex(rs) }.single.apply
+    }
+  }
+
+  def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
+    //    val cacheKey = s"labelId=$labelId:seq=$seq"
+    val cacheKey = "labelId=" + labelId + ":seq=" + seq
+    if (useCache) {
+      withCache(cacheKey)( sql"""
+      select * from label_indices where label_id = ${labelId} and seq = ${seq}
+      """.map { rs => LabelIndex(rs) }.single.apply)
+    } else {
+      sql"""
+      select * from label_indices where label_id = ${labelId} and seq = ${seq}
+      """.map { rs => LabelIndex(rs) }.single.apply
+    }
+  }
+
+  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
+    val labelIndex = findById(id)
+    val seqs = labelIndex.metaSeqs.mkString(",")
+    val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
+    sql"""delete from label_indices where id = ${id}""".execute.apply()
+
+    val cacheKeys = List(s"id=$id", s"labelId=$labelId", 
s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+  }
+
+  def findAll()(implicit session: DBSession = AutoSession) = {
+    val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) 
}.list.apply
+    putsToCache(ls.map { x =>
+      var cacheKey = s"id=${x.id.get}"
+      (cacheKey -> x)
+    })
+    putsToCache(ls.map { x =>
+      var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}"
+      (cacheKey -> x)
+    })
+    putsToCache(ls.map { x =>
+      var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}"
+      (cacheKey -> x)
+    })
+    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
+      val cacheKey = s"labelId=${labelId}"
+      (cacheKey -> ls)
+    }.toList)
+  }
+}
+
+/**
+ * formular
+ * ex1): w1, w2, w3
+ * ex2): 1.5 * w1^2 + 3.4 * (w1 * w2), w2, w1
+ */
+
+case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, 
metaSeqs: Seq[Byte], formulars: String) {
+  lazy val label = Label.findById(labelId)
+  lazy val metas = label.metaPropsMap
+  lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => 
label.metaPropsMap.get(metaSeq))
+  lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
+  lazy val toJson = Json.obj(
+    "name" -> name,
+    "propNames" -> sortKeyTypes.map(x => x.name)
+  )
+}

Reply via email to