http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index ef40688..b481880 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -21,48 +21,46 @@ package org.apache.s2graph.core
 
 import com.google.common.hash.Hashing
 import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.DuplicatePolicy.DuplicatePolicy
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
-import org.apache.s2graph.core.types.{HBaseSerializable, InnerVal, 
InnerValLike, LabelWithDirection}
+import org.apache.s2graph.core.rest.TemplateHelper
+import org.apache.s2graph.core.storage.StorageSerializable._
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, 
InnerValLikeWithTs, LabelWithDirection}
 import org.hbase.async.ColumnRangeFilter
-import play.api.libs.json.{JsNull, JsNumber, JsValue, Json}
+import play.api.libs.json.{JsString, JsNull, JsValue, Json}
 
-import scala.util.hashing.MurmurHash3
 import scala.util.{Success, Try}
 
 object Query {
   val initialScore = 1.0
   lazy val empty = Query()
-
+  def apply(query: Query): Query = {
+    Query(query.vertices, query.steps, query.queryOption, query.jsonQuery)
+  }
   def toQuery(srcVertices: Seq[Vertex], queryParam: QueryParam) = 
Query(srcVertices, Vector(Step(List(queryParam))))
 
-  object DuplicatePolicy extends Enumeration {
-    type DuplicatePolicy = Value
-    val First, Sum, CountSum, Raw = Value
-
-    def apply(policy: String): Value = {
-      policy match {
-        case "sum" => Query.DuplicatePolicy.Sum
-        case "countSum" => Query.DuplicatePolicy.CountSum
-        case "raw" => Query.DuplicatePolicy.Raw
-        case _ => DuplicatePolicy.First
-      }
-    }
-  }
 }
 
+case class MinShouldMatchParam(prop: String, count: Int, terms: Set[Any])
+
 object GroupBy {
   val Empty = GroupBy()
 }
 case class GroupBy(keys: Seq[String] = Nil,
-                   limit: Int = Int.MaxValue)
+                   limit: Int = Int.MaxValue,
+                   minShouldMatch: Option[MinShouldMatchParam]= None)
 
 case class MultiQuery(queries: Seq[Query],
                       weights: Seq[Double],
                       queryOption: QueryOption,
                       jsonQuery: JsValue = JsNull)
 
+object QueryOption {
+  val DefaultAscendingVals: Seq[Boolean] = Seq(false, false)
+}
+
 case class QueryOption(removeCycle: Boolean = false,
                        selectColumns: Seq[String] = Seq.empty,
                        groupBy: GroupBy = GroupBy.Empty,
@@ -75,9 +73,27 @@ case class QueryOption(removeCycle: Boolean = false,
                        returnAgg: Boolean = true,
                        scoreThreshold: Double = Double.MinValue,
                        returnDegree: Boolean = true,
-                       impIdOpt: Option[String] = None) {
+                       impIdOpt: Option[String] = None,
+                       shouldPropagateScore: Boolean = true,
+                       ignorePrevStepCache: Boolean = false) {
   val orderByKeys = orderByColumns.map(_._1)
   val ascendingVals = orderByColumns.map(_._2)
+  val selectColumnsMap = selectColumns.map { c => c -> true } .toMap
+  val scoreFieldIdx = orderByKeys.zipWithIndex.find(t => t._1 == 
"score").map(_._2).getOrElse(-1)
+  val (edgeSelectColumns, propsSelectColumns) = selectColumns.partition(c => 
LabelMeta.defaultRequiredMetaNames.contains(c))
+  /** */
+  val edgeSelectColumnsFiltered = edgeSelectColumns
+//  val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => 
groupBy.keys.contains(c))
+  lazy val cacheKeyBytes: Array[Byte] = {
+    val selectBytes = Bytes.toBytes(selectColumns.toString)
+    val groupBytes = Bytes.toBytes(groupBy.keys.toString)
+    val orderByBytes = Bytes.toBytes(orderByColumns.toString)
+    val filterOutBytes = 
filterOutQuery.map(_.fullCacheBytes).getOrElse(Array.empty[Byte])
+    val returnTreeBytes = Bytes.toBytes(returnTree)
+
+    Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, 
returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add)
+  }
+
 }
 
 case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
@@ -85,57 +101,13 @@ case class Query(vertices: Seq[Vertex] = Seq.empty[Vertex],
                  queryOption: QueryOption = QueryOption(),
                  jsonQuery: JsValue = JsNull) {
 
-  val removeCycle = queryOption.removeCycle
-  val selectColumns = queryOption.selectColumns
-  val groupBy = queryOption.groupBy
-  val orderByColumns = queryOption.orderByColumns
-  val filterOutQuery = queryOption.filterOutQuery
-  val filterOutFields = queryOption.filterOutFields
-  val withScore = queryOption.withScore
-  val returnTree = queryOption.returnTree
-  val limitOpt = queryOption.limitOpt
-  val returnAgg = queryOption.returnAgg
-  val returnDegree = queryOption.returnDegree
-
-  def cacheKeyBytes: Array[Byte] = {
-    val selectBytes = Bytes.toBytes(queryOption.selectColumns.toString)
-    val groupBytes = Bytes.toBytes(queryOption.groupBy.keys.toString)
-    val orderByBytes = Bytes.toBytes(queryOption.orderByColumns.toString)
-    val filterOutBytes = 
queryOption.filterOutQuery.map(_.cacheKeyBytes).getOrElse(Array.empty[Byte])
-    val returnTreeBytes = Bytes.toBytes(queryOption.returnTree)
-
-    Seq(selectBytes, groupBytes, orderByBytes, filterOutBytes, 
returnTreeBytes).foldLeft(Array.empty[Byte])(Bytes.add)
-  }
-
-  lazy val selectColumnsSet = queryOption.selectColumns.map { c =>
-    if (c == "_from") "from"
-    else if (c == "_to") "to"
-    else c
-  }.toSet
-
-  /** return logical query id without considering parameter values */
-  def templateId(): JsValue = {
-    Json.toJson(for {
-      step <- steps
-      queryParam <- step.queryParams.sortBy(_.labelWithDir.labelId)
-    } yield {
-        Json.obj("label" -> queryParam.label.label, "direction" -> 
GraphUtil.fromDirection(queryParam.labelWithDir.dir))
-      })
-  }
-
-  def impressionId(): JsNumber = {
-    val hash = MurmurHash3.stringHash(templateId().toString())
-    JsNumber(hash)
-  }
-
-  def cursorStrings(): Seq[Seq[String]] = {
-    //Don`t know how to replace all cursor keys in json
-    steps.map { step =>
-      step.queryParams.map { queryParam =>
-        queryParam.cursorOpt.getOrElse("")
-      }
-    }
+  lazy val fullCacheBytes = {
+    val srcBytes = 
vertices.map(_.innerId.bytes).foldLeft(Array.empty[Byte])(Bytes.add)
+    val stepBytes = 
steps.map(_.cacheKeyBytes).foldLeft(Array.empty[Byte])(Bytes.add)
+    val queryOptionBytes = queryOption.cacheKeyBytes
+    Bytes.add(srcBytes, stepBytes, queryOptionBytes)
   }
+  lazy val fullCacheKey: Long = 
Hashing.murmur3_128().hashBytes(fullCacheBytes).asLong()
 }
 
 object EdgeTransformer {
@@ -148,7 +120,7 @@ object EdgeTransformer {
  * TODO: step wise outputFields should be used with nextStepLimit, 
nextStepThreshold.
  * @param jsValue
  */
-case class EdgeTransformer(queryParam: QueryParam, jsValue: JsValue) {
+case class EdgeTransformer(jsValue: JsValue) {
   val Delimiter = "\\$"
   val targets = jsValue.asOpt[List[Vector[String]]].toList
   val fieldsLs = for {
@@ -159,7 +131,8 @@ case class EdgeTransformer(queryParam: QueryParam, jsValue: 
JsValue) {
 
   def toHashKeyBytes: Array[Byte] = if (isDefault) Array.empty[Byte] else 
Bytes.toBytes(jsValue.toString)
 
-  def replace(fmt: String,
+  def replace(queryParam: QueryParam,
+              fmt: String,
               values: Seq[InnerValLike],
               nextStepOpt: Option[Step]): Seq[InnerValLike] = {
 
@@ -189,19 +162,19 @@ case class EdgeTransformer(queryParam: QueryParam, 
jsValue: JsValue) {
     }
   }
 
-  def toInnerValOpt(edge: Edge, fieldName: String): Option[InnerValLike] = {
+  def toInnerValOpt(queryParam: QueryParam, edge: Edge, fieldName: String): 
Option[InnerValLike] = {
     fieldName match {
       case LabelMeta.to.name => Option(edge.tgtVertex.innerId)
       case LabelMeta.from.name => Option(edge.srcVertex.innerId)
       case _ =>
         for {
           labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName)
-          value <- edge.propsWithTs.get(labelMeta.seq)
+          value <- edge.propsWithTs.get(labelMeta)
         } yield value.innerVal
     }
   }
 
-  def transform(edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = {
+  def transform(queryParam: QueryParam, edge: Edge, nextStepOpt: 
Option[Step]): Seq[Edge] = {
     if (isDefault) Seq(edge)
     else {
       val edges = for {
@@ -209,10 +182,10 @@ case class EdgeTransformer(queryParam: QueryParam, 
jsValue: JsValue) {
         innerVal <- {
           if (fields.size == 1) {
             val fieldName = fields.head
-            toInnerValOpt(edge, fieldName).toSeq
+            toInnerValOpt(queryParam, edge, fieldName).toSeq
           } else {
             val fmt +: fieldNames = fields
-            replace(fmt, fieldNames.flatMap(fieldName => toInnerValOpt(edge, 
fieldName)), nextStepOpt)
+            replace(queryParam, fmt, fieldNames.flatMap(fieldName => 
toInnerValOpt(queryParam, edge, fieldName)), nextStepOpt)
           }
         }
       } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = 
Option(edge))
@@ -227,17 +200,18 @@ object Step {
   val Delimiter = "|"
 }
 
-case class Step(queryParams: List[QueryParam],
+case class Step(queryParams: Seq[QueryParam],
                 labelWeights: Map[Int, Double] = Map.empty,
-                //                scoreThreshold: Double = 0.0,
                 nextStepScoreThreshold: Double = 0.0,
                 nextStepLimit: Int = -1,
-                cacheTTL: Long = -1) {
+                cacheTTL: Long = -1,
+                groupBy: GroupBy = GroupBy.Empty) {
 
-  lazy val excludes = queryParams.filter(_.exclude)
-  lazy val includes = queryParams.filterNot(_.exclude)
-  lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
+//  lazy val excludes = queryParams.filter(_.exclude)
+//  lazy val includes = queryParams.filterNot(_.exclude)
+//  lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> 
true).toMap
 
+  lazy val cacheKeyBytes = 
queryParams.map(_.toCacheKeyRaw(Array.empty[Byte])).foldLeft(Array.empty[Byte])(Bytes.add)
   def toCacheKey(lss: Seq[Long]): Long = 
Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong()
 //    MurmurHash3.bytesHash(toCacheKeyRaw(lss))
 
@@ -265,359 +239,202 @@ case class VertexParam(vertices: Seq[Vertex]) {
 
 }
 
-//object RankParam {
-//  def apply(labelId: Int, keyAndWeights: Seq[(Byte, Double)]) = {
-//    new RankParam(labelId, keyAndWeights)
-//  }
-//}
+object RankParam {
+  val Default = RankParam()
+}
 
-case class RankParam(labelId: Int, var keySeqAndWeights: Seq[(Byte, Double)] = 
Seq.empty[(Byte, Double)]) {
+case class RankParam(keySeqAndWeights: Seq[(LabelMeta, Double)] = 
Seq((LabelMeta.count, 1.0))) {
   // empty => Count
   lazy val rankKeysWeightsMap = keySeqAndWeights.toMap
 
-  def defaultKey() = {
-    this.keySeqAndWeights = List((LabelMeta.countSeq, 1.0))
-    this
-  }
-
   def toHashKeyBytes(): Array[Byte] = {
     var bytes = Array.empty[Byte]
-    keySeqAndWeights.map { case (key, weight) =>
-      bytes = Bytes.add(bytes, Array.fill(1)(key), Bytes.toBytes(weight))
+    keySeqAndWeights.map { case (labelMeta, weight) =>
+      bytes = Bytes.add(bytes, Array.fill(1)(labelMeta.seq), 
Bytes.toBytes(weight))
     }
     bytes
   }
 }
-case class S2Request(labelName: String,
-                     direction: String = "out",
-                     ts: Long = System.currentTimeMillis(),
-                     options: Map[String, Any] = Map.empty) {
-  val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
-  val dir = GraphUtil.toDir(direction).getOrElse(throw new 
RuntimeException(s"$direction is not supported."))
-  val labelWithDir = LabelWithDirection(label.id.get, dir)
-  //TODO: need to merge options into queryParam.
-  val queryParam = QueryParam(labelWithDir, ts)
-
-}
 object QueryParam {
-  lazy val Empty = QueryParam(LabelWithDirection(0, 0))
+  lazy val Empty = QueryParam(labelName = "")
   lazy val DefaultThreshold = Double.MinValue
   val Delimiter = ","
   val maxMetaByte = (-1).toByte
   val fillArray = Array.fill(100)(maxMetaByte)
-}
 
-case class QueryParam(labelWithDir: LabelWithDirection, timestamp: Long = 
System.currentTimeMillis()) {
-
-  import HBaseSerializable._
-  import Query.DuplicatePolicy
-  import Query.DuplicatePolicy._
-
-  lazy val label = Label.findById(labelWithDir.labelId)
-  val DefaultKey = LabelIndex.DefaultSeq
-  val fullKey = DefaultKey
-
-  var labelOrderSeq = fullKey
-
-  var sample = -1
-  var limit = 10
-  var offset = 0
-  var rank = new RankParam(labelWithDir.labelId, List(LabelMeta.countSeq -> 1))
-
-  var duration: Option[(Long, Long)] = None
-  var isInverted: Boolean = false
-
-  var columnRangeFilter: ColumnRangeFilter = null
-
-  var hasFilters: Map[Byte, InnerValLike] = Map.empty[Byte, InnerValLike]
-  var where: Try[Where] = Success(WhereParser.success)
-  var whereRawOpt: Option[String] = None
-  var duplicatePolicy = DuplicatePolicy.First
-  var rpcTimeoutInMillis = 1000
-  var maxAttempt = 2
-  var includeDegree = false
-  var tgtVertexInnerIdOpt: Option[InnerValLike] = None
-  var cacheTTLInMillis: Long = -1L
-  var threshold = QueryParam.DefaultThreshold
-  var timeDecay: Option[TimeDecay] = None
-  var transformer: EdgeTransformer = EdgeTransformer(this, 
EdgeTransformer.DefaultJson)
-  var scorePropagateOp: String = "multiply"
-  var scorePropagateShrinkage: Long = 500
-  var exclude = false
-  var include = false
-  var shouldNormalize= false
-  var cursorOpt: Option[String] = None
-
-  var columnRangeFilterMinBytes = Array.empty[Byte]
-  var columnRangeFilterMaxBytes = Array.empty[Byte]
-
-  lazy val srcColumnWithDir = label.srcColumnWithDir(labelWithDir.dir)
-  lazy val tgtColumnWithDir = label.tgtColumnWithDir(labelWithDir.dir)
-
-  def toBytes(idxSeq: Byte, offset: Int, limit: Int, isInverted: Boolean): 
Array[Byte] = {
-    val front = Array[Byte](idxSeq, if (isInverted) 1.toByte else 0.toByte)
-    Bytes.add(front, Bytes.toBytes((offset.toLong << 32 | limit)))
+  def apply(labelWithDirection: LabelWithDirection): QueryParam = {
+    val label = Label.findById(labelWithDirection.labelId)
+    val direction = GraphUtil.fromDirection(labelWithDirection.dir)
+    QueryParam(labelName = label.label, direction = direction)
   }
-
-  /**
-   * consider only I/O specific parameters.
-   * properties that is used on Graph.filterEdges should not be considered.
-   * @param bytes
-   * @return
-   */
-  def toCacheKey(bytes: Array[Byte]): Long = {
-    val hashBytes = toCacheKeyRaw(bytes)
-    Hashing.murmur3_128().hashBytes(hashBytes).asLong()
-//    MurmurHash3.bytesHash(hashBytes)
-  }
-
-  def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = {
-    val transformBytes = transformer.toHashKeyBytes
+}
+case class QueryParam(labelName: String,
+                        direction: String = "out",
+                        offset: Int = 0,
+                        limit: Int = 100,
+                        sample: Int = -1,
+                        maxAttempt: Int = 2,
+                        rpcTimeout: Int = 1000,
+                        cacheTTLInMillis: Long = -1L,
+                        indexName: String = LabelIndex.DefaultName,
+                        where: Try[Where] = Success(WhereParser.success),
+                        timestamp: Long = System.currentTimeMillis(),
+                        threshold: Double = Double.MinValue,
+                        rank: RankParam = RankParam.Default,
+                        intervalOpt: Option[((Seq[(String, JsValue)]), 
Seq[(String, JsValue)])] = None,
+                        durationOpt: Option[(Long, Long)] = None,
+                        exclude: Boolean = false,
+                        include: Boolean = false,
+                        has: Map[String, Any] = Map.empty,
+                        duplicatePolicy: DuplicatePolicy = 
DuplicatePolicy.First,
+                        includeDegree: Boolean = false,
+                        scorePropagateShrinkage: Long = 500L,
+                        scorePropagateOp: String = "multiply",
+                        shouldNormalize: Boolean = false,
+                        whereRawOpt: Option[String] = None,
+                        cursorOpt: Option[String] = None,
+                        tgtVertexIdOpt: Option[Any] = None,
+                        edgeTransformer: EdgeTransformer = 
EdgeTransformer(EdgeTransformer.DefaultJson),
+                        timeDecay: Option[TimeDecay] = None) {
+  import JSONParser._
+
+  //TODO: implement this.
+  lazy val whereHasParent = true
+
+  lazy val label = Label.findByName(labelName).getOrElse(throw 
LabelNotExistException(labelName))
+  lazy val dir = GraphUtil.toDir(direction).getOrElse(throw new 
RuntimeException(s"not supported direction: $direction"))
+
+  lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
+  lazy val labelOrderSeq =
+    if (indexName == LabelIndex.DefaultName) LabelIndex.DefaultSeq
+    else label.indexNameMap.getOrElse(indexName, throw new 
RuntimeException(s"$indexName indexName is not found.")).seq
+
+  lazy val tgtVertexInnerIdOpt = tgtVertexIdOpt.map { id =>
+    val tmp = label.tgtColumnWithDir(dir)
+    toInnerVal(id, tmp.columnType, tmp.schemaVersion)
+  }
+
+  def buildInterval(edgeOpt: Option[Edge]) = intervalOpt match {
+    case None => Array.empty[Byte] -> Array.empty[Byte]
+    case Some(interval) =>
+      val (froms, tos) = interval
+
+      val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
+      val (maxBytes, minBytes) = paddingInterval(len, froms, tos, edgeOpt)
+
+      maxBytes -> minBytes
+  }
+
+  lazy val isSnapshotEdge = tgtVertexInnerIdOpt.isDefined
+
+  /** since degree info is located on first always */
+  lazy val (innerOffset, innerLimit) = if (intervalOpt.isEmpty) {
+    if (offset == 0) (offset, if (limit == Int.MaxValue) limit else limit + 1)
+    else (offset + 1, limit)
+  } else (offset, limit)
+
+  lazy val optionalCacheKey: Array[Byte] = {
+    val transformBytes = edgeTransformer.toHashKeyBytes
     //TODO: change this to binrary format.
     val whereBytes = Bytes.toBytes(whereRawOpt.getOrElse(""))
-    val durationBytes = duration.map { case (min, max) =>
+    val durationBytes = durationOpt.map { case (min, max) =>
       val minTs = min / cacheTTLInMillis
       val maxTs = max / cacheTTLInMillis
       Bytes.add(Bytes.toBytes(minTs), Bytes.toBytes(maxTs))
     } getOrElse Array.empty[Byte]
-//    Bytes.toBytes(duration.toString)
+
     val conditionBytes = Bytes.add(transformBytes, whereBytes, durationBytes)
-    Bytes.add(Bytes.add(bytes, labelWithDir.bytes, toBytes(labelOrderSeq, 
offset, limit, isInverted)), rank.toHashKeyBytes(),
-      Bytes.add(columnRangeFilterMinBytes, columnRangeFilterMaxBytes, 
conditionBytes))
-  }
 
-  def isInverted(isInverted: Boolean): QueryParam = {
-    this.isInverted = isInverted
-    this
+    // Interval cache bytes is moved to fetch method
+    Bytes.add(Bytes.add(toBytes(offset, limit), rank.toHashKeyBytes()), 
conditionBytes)
   }
 
-  def labelOrderSeq(labelOrderSeq: Byte): QueryParam = {
-    this.labelOrderSeq = labelOrderSeq
-    this
+  def toBytes(offset: Int, limit: Int): Array[Byte] = {
+    Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit))
   }
 
-  def sample(n: Int): QueryParam = {
-    this.sample = n
-    this
+  def toCacheKey(bytes: Array[Byte]): Long = {
+    val hashBytes = toCacheKeyRaw(bytes)
+    Hashing.murmur3_128().hashBytes(hashBytes).asLong()
   }
 
-  def limit(offset: Int, limit: Int): QueryParam = {
-    /** since degree info is located on first always */
-    this.limit = limit
-    this.offset = offset
-
-    if (this.columnRangeFilter == null) {
-      if (offset == 0) this.limit = limit + 1
-      else this.offset = offset + 1
-    }
-    //    this.columnPaginationFilter = new ColumnPaginationFilter(this.limit, 
this.offset)
-    this
+  def toCacheKeyRaw(bytes: Array[Byte]): Array[Byte] = {
+    Bytes.add(bytes, optionalCacheKey)
   }
 
-  def interval(fromTo: Option[(Seq[(Byte, InnerValLike)], Seq[(Byte, 
InnerValLike)])]): QueryParam = {
-    fromTo match {
-      case Some((from, to)) => interval(from, to)
-      case _ => this
-    }
-  }
+  private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: 
Option[Edge]): Seq[(LabelMeta, InnerValLike)] = {
+    kvs.map { case (propKey, propValJs) =>
+      propValJs match {
+        case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") =>
+          val parentLen = in.split("_parent.").length - 1
+          val edge = (0 until parentLen).foldLeft(edgeOpt.get) { case (acc, _) 
=> acc.parentEdges.head.edge }
 
-  def paddingInterval(len: Byte, from: Seq[(Byte, InnerValLike)], to: 
Seq[(Byte, InnerValLike)]) = {
-    val fromVal = Bytes.add(propsToBytes(from), QueryParam.fillArray)
-    val toVal = propsToBytes(to)
+          val timePivot = edge.ts
+          val replaced = TemplateHelper.replaceVariable(timePivot, in).trim
 
-    toVal(0) = len
-    fromVal(0) = len
+          val (_propKey, _padding) = replaced.span(ch => !ch.isDigit && ch != 
'-' && ch != '+' && ch != ' ')
+          val propKey = _propKey.split("_parent.").last
+          val padding = Try(_padding.trim.toLong).getOrElse(0L)
 
-    val minMax = (toVal, fromVal) // inverted
-    minMax
-  }
+          val labelMeta = edge.label.metaPropsInvMap.getOrElse(propKey, throw 
new RuntimeException(s"$propKey not found in ${edge} labelMetas."))
 
-  def interval(from: Seq[(Byte, InnerValLike)], to: Seq[(Byte, 
InnerValLike)]): QueryParam = {
-    val len = label.indicesMap(labelOrderSeq).sortKeyTypes.size.toByte
-    val (minBytes, maxBytes) = paddingInterval(len, from, to)
+          val propVal =
+            if (InnerVal.isNumericType(labelMeta.dataType)) {
+              
InnerVal.withLong(edge.props(labelMeta).value.asInstanceOf[BigDecimal].longValue()
 + padding, label.schemaVersion)
+            } else {
+              edge.props(labelMeta)
+            }
 
-    this.columnRangeFilterMaxBytes = maxBytes
-    this.columnRangeFilterMinBytes = minBytes
-    this.columnRangeFilter = new ColumnRangeFilter(minBytes, true, maxBytes, 
true)
-    this
-  }
+          labelMeta -> propVal
+        case _ =>
+          val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw new 
RuntimeException(s"$propKey not found in labelMetas."))
+          val propVal = jsValueToInnerVal(propValJs, labelMeta.dataType, 
label.schemaVersion)
 
-  def duration(minMaxTs: Option[(Long, Long)]): QueryParam = {
-    minMaxTs match {
-      case Some((minTs, maxTs)) => duration(minTs, maxTs)
-      case _ => this
+          labelMeta -> propVal.get
+      }
     }
   }
 
-  def duration(minTs: Long, maxTs: Long): QueryParam = {
-    this.duration = Some((minTs, maxTs))
-    this
-  }
-
-  def rank(r: RankParam): QueryParam = {
-    this.rank = r
-    this
-  }
-
-  def exclude(filterOut: Boolean): QueryParam = {
-    this.exclude = filterOut
-    this
-  }
-
-  def include(filterIn: Boolean): QueryParam = {
-    this.include = filterIn
-    this
-  }
-
-  def has(hasFilters: Map[Byte, InnerValLike]): QueryParam = {
-    this.hasFilters = hasFilters
-    this
-  }
-
-  def where(whereTry: Try[Where]): QueryParam = {
-    this.where = whereTry
-    this
-  }
-
-  def duplicatePolicy(policy: Option[DuplicatePolicy]): QueryParam = {
-    this.duplicatePolicy = policy.getOrElse(DuplicatePolicy.First)
-    this
-  }
+  def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: 
Seq[(String, JsValue)], edgeOpt: Option[Edge] = None) = {
+    val fromInnerVal = convertToInner(froms, edgeOpt)
+    val toInnerVal = convertToInner(tos, edgeOpt)
 
-  def rpcTimeout(millis: Int): QueryParam = {
-    this.rpcTimeoutInMillis = millis
-    this
-  }
-
-  def maxAttempt(attempt: Int): QueryParam = {
-    this.maxAttempt = attempt
-    this
-  }
-
-  def includeDegree(includeDegree: Boolean): QueryParam = {
-    this.includeDegree = includeDegree
-    this
-  }
-
-  def scorePropagateShrinkage(scorePropagateShrinkage: Long): QueryParam = {
-    this.scorePropagateShrinkage = scorePropagateShrinkage
-    this
-  }
+    val fromVal = Bytes.add(propsToBytes(fromInnerVal), QueryParam.fillArray)
+    val toVal = propsToBytes(toInnerVal)
 
-  def tgtVertexInnerIdOpt(other: Option[InnerValLike]): QueryParam = {
-    this.tgtVertexInnerIdOpt = other
-    this
-  }
+    toVal(0) = len
+    fromVal(0) = len
 
-  def cacheTTLInMillis(other: Long): QueryParam = {
-    this.cacheTTLInMillis = other
-    this
+    val minMax = (toVal, fromVal) // inverted
+    minMax
   }
 
-  def timeDecay(other: Option[TimeDecay]): QueryParam = {
-    this.timeDecay = other
-    this
+  def toLabelMetas(names: Seq[String]): Set[LabelMeta] = {
+    val m = for {
+      name <- names
+      labelMeta <- label.metaPropsInvMap.get(name)
+    } yield labelMeta
+    m.toSet
   }
+}
 
-  def threshold(other: Double): QueryParam = {
-    this.threshold = other
-    this
-  }
+object DuplicatePolicy extends Enumeration {
+  type DuplicatePolicy = Value
+  val First, Sum, CountSum, Raw = Value
 
-  def transformer(other: Option[JsValue]): QueryParam = {
-    other match {
-      case Some(js) => this.transformer = EdgeTransformer(this, js)
-      case None =>
+  def apply(policy: String): Value = {
+    policy match {
+      case "sum" => DuplicatePolicy.Sum
+      case "countSum" => DuplicatePolicy.CountSum
+      case "raw" => DuplicatePolicy.Raw
+      case _ => DuplicatePolicy.First
     }
-    this
-  }
-
-  def scorePropagateOp(scorePropagateOp: String): QueryParam = {
-    this.scorePropagateOp = scorePropagateOp
-    this
-  }
-
-  def shouldNormalize(shouldNormalize: Boolean): QueryParam = {
-    this.shouldNormalize = shouldNormalize
-    this
   }
-
-  def whereRawOpt(sqlOpt: Option[String]): QueryParam = {
-    this.whereRawOpt = sqlOpt
-    this
-  }
-
-  def cursorOpt(cursorOpt: Option[String]): QueryParam = {
-    this.cursorOpt = cursorOpt
-    this
-  }
-
-  def isSnapshotEdge = tgtVertexInnerIdOpt.isDefined
-
-  override def toString = {
-    List(label.label, labelOrderSeq, offset, limit, rank,
-      duration, isInverted, exclude, include, hasFilters).mkString("\t")
-    //      duration, isInverted, exclude, include, hasFilters, 
outputFields).mkString("\t")
-  }
-
-  //
-  //  def buildGetRequest(srcVertex: Vertex) = {
-  //    val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
-  //    val (srcInnerId, tgtInnerId) = tgtVertexInnerIdOpt match {
-  //      case Some(tgtVertexInnerId) => // _to is given.
-  //        /** we use toInvertedEdgeHashLike so dont need to swap src, tgt */
-  //        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-  //        val tgt = InnerVal.convertVersion(tgtVertexInnerId, 
tgtColumn.columnType, label.schemaVersion)
-  //        (src, tgt)
-  //      case None =>
-  //        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-  //        (src, src)
-  //    }
-  //
-  //    val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), 
TargetVertexId(tgtColumn.id.get, tgtInnerId))
-  //    val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
-  //    val edge = Edge(srcV, tgtV, labelWithDir)
-  //
-  //    val get = if (tgtVertexInnerIdOpt.isDefined) {
-  //      val snapshotEdge = edge.toInvertedEdgeHashLike
-  //      val kv = snapshotEdge.kvs.head
-  //      new GetRequest(label.hbaseTableName.getBytes, kv.row, edgeCf, 
kv.qualifier)
-  //    } else {
-  //      val indexedEdgeOpt = edge.edgesWithIndex.find(e => e.labelIndexSeq 
== labelOrderSeq)
-  //      assert(indexedEdgeOpt.isDefined)
-  //      val indexedEdge = indexedEdgeOpt.get
-  //      val kv = indexedEdge.kvs.head
-  //      val table = label.hbaseTableName.getBytes
-  //        //kv.table //
-  //      val rowKey = kv.row // indexedEdge.rowKey.bytes
-  //      val cf = edgeCf
-  //      new GetRequest(table, rowKey, cf)
-  //    }
-  //
-  //    val (minTs, maxTs) = duration.getOrElse((0L, Long.MaxValue))
-  //
-  //    get.maxVersions(1)
-  //    get.setFailfast(true)
-  //    get.setMaxResultsPerColumnFamily(limit)
-  //    get.setRowOffsetPerColumnFamily(offset)
-  //    get.setMinTimestamp(minTs)
-  //    get.setMaxTimestamp(maxTs)
-  //    get.setTimeout(rpcTimeoutInMillis)
-  //    if (columnRangeFilter != null) get.setFilter(columnRangeFilter)
-  //    //    get.setMaxAttempt(maxAttempt.toByte)
-  //    //    get.setRpcTimeout(rpcTimeoutInMillis)
-  //
-  //    //    if (columnRangeFilter != null) get.filter(columnRangeFilter)
-  //    //    logger.debug(s"Get: $get, $offset, $limit")
-  //
-  //    get
-  //  }
 }
-
 case class TimeDecay(initial: Double = 1.0,
                      lambda: Double = 0.1,
                      timeUnit: Double = 60 * 60 * 24,
-                     labelMetaSeq: Byte = LabelMeta.timeStampSeq) {
+                     labelMeta: LabelMeta = LabelMeta.timestamp) {
   def decay(diff: Double): Double = {
     //FIXME
     val ret = initial * Math.pow(1.0 - lambda, diff / timeUnit)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 5b2622f..dd7e45d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -19,71 +19,106 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
 
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.{Seq, mutable}
 
 object QueryResult {
-  def fromVertices(query: Query): StepInnerResult = {
+  def fromVertices(graph: Graph,
+                   query: Query): StepResult = {
     if (query.steps.isEmpty || query.steps.head.queryParams.isEmpty) {
-      StepInnerResult.Empty
+      StepResult.Empty
     } else {
       val queryParam = query.steps.head.queryParams.head
       val label = queryParam.label
       val currentTs = System.currentTimeMillis()
-      val propsWithTs = Map(LabelMeta.timeStampSeq ->
+      val propsWithTs = Map(LabelMeta.timestamp ->
         InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs))
-      val edgeWithScoreLs = for {
+      val edgeWithScores = for {
         vertex <- query.vertices
       } yield {
-          val edge = Edge(vertex, vertex, queryParam.labelWithDir, propsWithTs 
= propsWithTs)
-          val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore)
+          val edge = Edge(vertex, vertex, label, queryParam.labelWithDir.dir, 
propsWithTs = propsWithTs)
+          val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore, 
queryParam.label)
           edgeWithScore
         }
-      StepInnerResult(edgesWithScoreLs = edgeWithScoreLs, Nil, false)
+      StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = 
Nil, false)
     }
   }
 }
-/** inner traverse */
-object StepInnerResult {
-  val Failure = StepInnerResult(Nil, Nil, true)
-  val Empty = StepInnerResult(Nil, Nil, false)
-}
-case class StepInnerResult(edgesWithScoreLs: Seq[EdgeWithScore],
-                           degreeEdges: Seq[EdgeWithScore],
-                           isFailure: Boolean = false) {
-  val isEmpty = edgesWithScoreLs.isEmpty
-}
 
 case class QueryRequest(query: Query,
                         stepIdx: Int,
                         vertex: Vertex,
-                        queryParam: QueryParam)
-
+                        queryParam: QueryParam,
+                        prevStepScore: Double = 1.0,
+                        labelWeight: Double = 1.0) {
+  val nextStepOpt =
+    if (stepIdx < query.steps.size - 1) Option(query.steps(stepIdx + 1))
+    else None
+}
 
-case class QueryResult(edgeWithScoreLs: Seq[EdgeWithScore] = Nil,
-                       tailCursor: Array[Byte] = Array.empty,
-                       timestamp: Long = System.currentTimeMillis(),
-                       isFailure: Boolean = false)
+trait WithScore[T] {
+  def score(me: T): Double
+  def withNewScore(me: T, newScore: Double): T
+}
 
-case class EdgeWithScore(edge: Edge, score: Double)
+object WithScore {
+  implicit val impEdgeWithScore = new WithScore[EdgeWithScore] {
+    override def score(me: EdgeWithScore): Double = me.score
 
+    override def withNewScore(me: EdgeWithScore, newScore: Double): 
EdgeWithScore = me.copy(score = newScore)
+  }
+}
 
+case class EdgeWithScore(edge: Edge,
+                         score: Double,
+                         label: Label,
+                         orderByValues: (Any, Any, Any, Any) = 
StepResult.EmptyOrderByValues,
+                         groupByValues: Seq[Option[Any]] = Nil,
+                         stepGroupByValues: Seq[Option[Any]] = Nil,
+                         filterOutValues: Seq[Option[Any]] = Nil,
+                         accumulatedScores: Map[String, Double] = Map.empty) {
+
+  def toValue(keyName: String): Option[Any] = keyName match {
+    case "from" | "_from" => Option(edge.srcId)
+    case "to" | "_to" => Option(edge.tgtId)
+    case "label" => Option(label.label)
+    case "direction" => Option(edge.dir)
+    case "score" => Option(score)
+    case _ =>
+      label.metaPropsInvMap.get(keyName).flatMap { labelMeta =>
+        
edge.propsWithTs.get(labelMeta).orElse(label.metaPropsDefaultMapInner.get(labelMeta)).map(_.innerVal.value)
+      }
+  }
 
+  def toValues(keyNames: Seq[String]): Seq[Option[Any]] = for {
+    keyName <- keyNames
+  } yield toValue(keyName)
 
+}
 
 /** result */
+case class StepResult(edgeWithScores: Seq[EdgeWithScore],
+                      grouped: Seq[(StepResult.GroupByKey, (Double, 
StepResult.Values))],
+                      degreeEdges: Seq[EdgeWithScore],
+                      isFailure: Boolean = false,
+                      accumulatedCursors: Seq[Seq[Array[Byte]]] = Nil,
+                      cursors: Seq[Array[Byte]] = Nil,
+                      failCount: Int = 0) {
+  //  val isInnerEmpty = innerResults.isEmpty
+  val isEmpty = edgeWithScores.isEmpty
+}
 
 object StepResult {
 
-  type Values = Seq[S2EdgeWithScore]
+  type Values = Seq[EdgeWithScore]
   type GroupByKey = Seq[Option[Any]]
   val EmptyOrderByValues = (None, None, None, None)
   val Empty = StepResult(Nil, Nil, Nil)
-
+  val Failure = StepResult(Nil, Nil, Nil, true, failCount = 1)
 
   def mergeOrdered(left: StepResult.Values,
                    right: StepResult.Values,
@@ -99,177 +134,182 @@ object StepResult {
     }
   }
 
+  def filterOutStepGroupBy(edgesWithScores: Seq[EdgeWithScore],
+                                   groupBy: GroupBy): Seq[EdgeWithScore] =
+    if (groupBy == GroupBy.Empty) edgesWithScores
+    else {
+      groupBy.minShouldMatch match {
+        case None => edgesWithScores
+        case Some(minShouldMatch) =>
+          val MinShouldMatchParam(propKey, count, terms) = minShouldMatch
+
+          val grouped = edgesWithScores.groupBy { edgeWithScore =>
+            edgeWithScore.stepGroupByValues
+          }.filter { case (key, edges) =>
+            val filtered = edges.toStream.filter{ e =>
+              e.toValue(propKey) match {
+                case None => false
+                case Some(v) => terms.contains(v)
+              }
+            }.take(count)
+
+            filtered.lengthCompare(count) >= 0
+          }
+
+          grouped.values.flatten.toSeq
+      }
+    }
+
   def orderBy(queryOption: QueryOption, notOrdered: Values): Values = {
     import OrderingUtil._
-
-    if (queryOption.withScore && queryOption.orderByColumns.nonEmpty) {
-      
notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](queryOption.ascendingVals))
+    if (queryOption.withScore) {
+      val ascendingVals = if (queryOption.ascendingVals.isEmpty) 
QueryOption.DefaultAscendingVals else queryOption.ascendingVals
+      
notOrdered.sortBy(_.orderByValues)(TupleMultiOrdering[Any](ascendingVals))
     } else {
       notOrdered
     }
   }
-  def toOrderByValues(s2Edge: Edge,
-                      score: Double,
-                      orderByKeys: Seq[String]): (Any, Any, Any, Any) = {
-    def toValue(propertyKey: String): Any = {
-      propertyKey match {
-        case "score" => score
-        case "timestamp" | "_timestamp" => s2Edge.ts
-        case _ => s2Edge.properties.get(propertyKey)
-      }
+
+  def updateScoreOnOrderByValues(scoreFieldIndex: Int,
+                                 orderByValues: (Any, Any, Any, Any),
+                                 newScore: Double): (Any, Any, Any, Any) = {
+    scoreFieldIndex match {
+      case 0 => (newScore, orderByValues._2, orderByValues._3, 
orderByValues._4)
+      case 1 => (orderByValues._1, newScore, orderByValues._3, 
orderByValues._4)
+      case 2 => (orderByValues._1, orderByValues._2, newScore, 
orderByValues._4)
+      case 3 => (orderByValues._1, orderByValues._2, orderByValues._3, 
newScore)
+      case _ => orderByValues
     }
-    if (orderByKeys.isEmpty) (None, None, None, None)
-    else {
-      orderByKeys.length match {
-        case 1 =>
-          (toValue(orderByKeys(0)), None, None, None)
-        case 2 =>
-          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), None, None)
-        case 3 =>
-          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), 
toValue(orderByKeys(2)), None)
-        case _ =>
-          (toValue(orderByKeys(0)), toValue(orderByKeys(1)), 
toValue(orderByKeys(2)), toValue(orderByKeys(3)))
-      }
+
+  }
+
+  def toTuple4(values: Seq[Option[Any]]): (Any, Any, Any, Any) = {
+    values.length match {
+      case 1 => (values(0).getOrElse(None), None, None, None)
+      case 2 => (values(0).getOrElse(None), values(1).getOrElse(None), None, 
None)
+      case 3 => (values(0).getOrElse(None), values(1).getOrElse(None), 
values(2).getOrElse(None), None)
+      case _ => (values(0).getOrElse(None), values(1).getOrElse(None), 
values(2).getOrElse(None), values(3).getOrElse(None))
     }
   }
+
   /**
    * merge multiple StepResult into one StepResult.
-   * @param queryOption
+   * @param globalQueryOption
    * @param multiStepResults
+   * @param weights
+   * @param filterOutStepResult
    * @return
    */
-  def merges(queryOption: QueryOption,
+  def merges(globalQueryOption: QueryOption,
              multiStepResults: Seq[StepResult],
-             weights: Seq[Double] = Nil): StepResult = {
+             weights: Seq[Double] = Nil,
+             filterOutStepResult: StepResult): StepResult = {
     val degrees = multiStepResults.flatMap(_.degreeEdges)
-    val ls = new mutable.ListBuffer[S2EdgeWithScore]()
-    val agg= new mutable.HashMap[GroupByKey, ListBuffer[S2EdgeWithScore]]()
+    val ls = new mutable.ListBuffer[EdgeWithScore]()
+    val agg= new mutable.HashMap[GroupByKey, ListBuffer[EdgeWithScore]]()
     val sums = new mutable.HashMap[GroupByKey, Double]()
 
+
+    val filterOutSet = 
filterOutStepResult.edgeWithScores.foldLeft(Set.empty[Seq[Option[Any]]]) { case 
(prev, t) =>
+      prev + t.filterOutValues
+    }
+
     for {
       (weight, eachStepResult) <- weights.zip(multiStepResults)
-      (ordered, grouped) = (eachStepResult.results, eachStepResult.grouped)
+      (ordered, grouped) = (eachStepResult.edgeWithScores, 
eachStepResult.grouped)
     } {
       ordered.foreach { t =>
-        val newScore = t.score * weight
-        ls += t.copy(score = newScore)
+        val filterOutKey = t.filterOutValues
+        if (!filterOutSet.contains(filterOutKey)) {
+          val newScore = t.score * weight
+          val newT = t.copy(score = newScore)
+
+          //          val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
+          val newOrderByValues =
+            if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.tsInnerVal, None, None)
+            else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
+
+          val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys)
+
+          ls += t.copy(score = newScore, orderByValues = newOrderByValues, 
groupByValues = newGroupByValues)
+        }
       }
 
       // process each query's stepResult's grouped
       for {
         (groupByKey, (scoreSum, values)) <- grouped
       } {
-        val buffer = agg.getOrElseUpdate(groupByKey, 
ListBuffer.empty[S2EdgeWithScore])
+        val buffer = agg.getOrElseUpdate(groupByKey, 
ListBuffer.empty[EdgeWithScore])
         var scoreSum = 0.0
+        var isEmpty = true
         values.foreach { t =>
-          val newScore = t.score * weight
-          buffer += t.copy(score = newScore)
-          scoreSum += newScore
+          val filterOutKey = t.filterOutValues
+          if (!filterOutSet.contains(filterOutKey)) {
+            isEmpty = false
+            val newScore = t.score * weight
+            val newT = t.copy(score = newScore)
+//            val newOrderByValues = 
updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, 
newScore)
+
+            val newOrderByValues =
+              if (globalQueryOption.orderByKeys.isEmpty) (newScore, 
t.edge.tsInnerVal, None, None)
+              else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
+
+            val newGroupByValues = 
newT.toValues(globalQueryOption.groupBy.keys)
+
+            buffer += t.copy(score = newScore, orderByValues = 
newOrderByValues, groupByValues = newGroupByValues)
+            scoreSum += newScore
+          }
         }
-        sums += (groupByKey -> scoreSum)
+        if (!isEmpty) sums += (groupByKey -> scoreSum)
       }
     }
 
     // process global groupBy
-    if (queryOption.groupBy.keys.nonEmpty) {
+    val (ordered, grouped) = if (globalQueryOption.groupBy.keys.nonEmpty) {
       for {
-        s2EdgeWithScore <- ls
-        groupByKey = 
s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys)
+        edgeWithScore <- ls
+        groupByKey = edgeWithScore.groupByValues
       } {
-        val buffer = agg.getOrElseUpdate(groupByKey, 
ListBuffer.empty[S2EdgeWithScore])
-        buffer += s2EdgeWithScore
-        val newScore = sums.getOrElse(groupByKey, 0.0) + s2EdgeWithScore.score
+        val buffer = agg.getOrElseUpdate(groupByKey, 
ListBuffer.empty[EdgeWithScore])
+        buffer += edgeWithScore
+        val newScore = sums.getOrElse(groupByKey, 0.0) + edgeWithScore.score
         sums += (groupByKey -> newScore)
       }
+      val grouped = for {
+        (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
+        aggregated = agg(groupByKey) if aggregated.nonEmpty
+        sorted = orderBy(globalQueryOption, aggregated)
+      } yield groupByKey -> (scoreSum, sorted)
+      (Nil, grouped)
+    } else {
+      val ordered = orderBy(globalQueryOption, ls)
+      (ordered, Nil)
     }
 
-
-    val ordered = orderBy(queryOption, ls)
-    val grouped = for {
-      (groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
-      aggregated = agg(groupByKey) if aggregated.nonEmpty
-    } yield groupByKey -> (scoreSum, aggregated)
-
-    StepResult(results = ordered, grouped = grouped, degrees)
+    StepResult(edgeWithScores = ordered, grouped = grouped, degrees, failCount 
= multiStepResults.map(_.failCount).sum)
   }
 
   //TODO: Optimize this.
   def filterOut(graph: Graph,
                 queryOption: QueryOption,
                 baseStepResult: StepResult,
-                filterOutStepInnerResult: StepInnerResult): StepResult = {
+                filterOutStepResult: StepResult): StepResult = {
 
-    val fields = if (queryOption.filterOutFields.isEmpty) Seq("to") else 
Seq("to")
-    //    else queryOption.filterOutFields
-    val filterOutSet = filterOutStepInnerResult.edgesWithScoreLs.map { t =>
-      t.edge.selectValues(fields)
-    }.toSet
+    val filterOutSet = 
filterOutStepResult.edgeWithScores.foldLeft(Set.empty[Seq[Option[Any]]]) { case 
(prev, t) =>
+      prev + t.filterOutValues
+    }
 
-    val filteredResults = baseStepResult.results.filter { t =>
-      val filterOutKey = t.s2Edge.selectValues(fields)
+    val filteredResults = baseStepResult.edgeWithScores.filter { t =>
+      val filterOutKey = t.filterOutValues
       !filterOutSet.contains(filterOutKey)
     }
 
     val grouped = for {
       (key, (scoreSum, values)) <- baseStepResult.grouped
-      (out, in) = values.partition(v => 
filterOutSet.contains(v.s2Edge.selectValues(fields)))
+      (out, in) = values.partition(v => 
filterOutSet.contains(v.filterOutValues))
       newScoreSum = scoreSum - out.foldLeft(0.0) { case (prev, current) => 
prev + current.score } if in.nonEmpty
     } yield key -> (newScoreSum, in)
 
-
-    StepResult(results = filteredResults, grouped = grouped, 
baseStepResult.degreeEdges)
-  }
-  def apply(graph: Graph,
-            queryOption: QueryOption,
-            stepInnerResult: StepInnerResult): StepResult = {
-        logger.debug(s"[BeforePostProcess]: 
${stepInnerResult.edgesWithScoreLs.size}")
-
-    val results = for {
-      edgeWithScore <- stepInnerResult.edgesWithScoreLs
-    } yield {
-        val edge = edgeWithScore.edge
-        val orderByValues =
-          if (queryOption.orderByColumns.isEmpty) (edgeWithScore.score, None, 
None, None)
-          else toOrderByValues(edge, edgeWithScore.score, 
queryOption.orderByKeys)
-
-        S2EdgeWithScore(edge, edgeWithScore.score, orderByValues, 
edgeWithScore.edge.parentEdges)
-      }
-    /** ordered flatten result */
-    val ordered = orderBy(queryOption, results)
-
-    /** ordered grouped result */
-    val grouped =
-      if (queryOption.groupBy.keys.isEmpty) Nil
-      else {
-        val agg = new mutable.HashMap[GroupByKey, (Double, Values)]()
-        results.groupBy { s2EdgeWithScore =>
-          s2EdgeWithScore.s2Edge.selectValues(queryOption.groupBy.keys, 
useToString = true)
-        }.map { case (k, ls) =>
-          val (scoreSum, merged) = mergeOrdered(ls, Nil, queryOption)
-          /**
-           * watch out here. by calling toString on Any, we lose type 
information which will be used
-           * later for toJson.
-           */
-          if (merged.nonEmpty) {
-            val newKey = 
merged.head.s2Edge.selectValues(queryOption.groupBy.keys, useToString = false)
-            agg += (newKey -> (scoreSum, merged))
-          }
-        }
-        agg.toSeq.sortBy(_._2._1 * -1)
-      }
-
-    val degrees = stepInnerResult.degreeEdges.map(t => S2EdgeWithScore(t.edge, 
t.score))
-    StepResult(results = ordered, grouped = grouped, degreeEdges = degrees)
+    StepResult(edgeWithScores = filteredResults, grouped = grouped, 
baseStepResult.degreeEdges, cursors = baseStepResult.cursors, failCount = 
baseStepResult.failCount + filterOutStepResult.failCount)
   }
 }
-
-case class S2EdgeWithScore(s2Edge: Edge,
-                           score: Double,
-                           orderByValues: (Any, Any, Any, Any) = 
StepResult.EmptyOrderByValues,
-                           parentEdges: Seq[EdgeWithScore] = Nil)
-
-case class StepResult(results: StepResult.Values,
-                      grouped: Seq[(StepResult.GroupByKey, (Double, 
StepResult.Values))],
-                      degreeEdges: StepResult.Values) {
-  val isEmpty = results.isEmpty
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index 0377bd8..bbd71ec 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -23,49 +23,6 @@ import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, 
VertexId}
 import play.api.libs.json.Json
-//
-//object S2Vertex {
-//  def apply(graph: Graph, vertex: Vertex): S2Vertex = {
-//    S2Vertex(graph,
-//      vertex.serviceName,
-//      vertex.serviceColumn.columnName,
-//      vertex.innerIdVal,
-//      vertex.serviceColumn.innerValsToProps(vertex.props),
-//      vertex.ts,
-//      GraphUtil.fromOp(vertex.op)
-//    )
-//  }
-//}
-//
-//case class S2Vertex(graph: Graph,
-//                    serviceName: String,
-//                    columnName: String,
-//                    id: Any,
-//                    props: Map[String, Any] = Map.empty,
-//                    ts: Long = System.currentTimeMillis(),
-//                    operation: String = "insert") extends GraphElement {
-//  lazy val vertex = {
-//    val service = Service.findByName(serviceName).getOrElse(throw new 
RuntimeException(s"$serviceName is not found."))
-//    val column = ServiceColumn.find(service.id.get, 
columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
-//    val op = GraphUtil.toOp(operation).getOrElse(throw new 
RuntimeException(s"$operation is not supported."))
-//
-//    val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, 
column.columnType, column.schemaVersion))
-//    val propsInner = column.propsToInnerVals(props) ++
-//      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, 
column.schemaVersion))
-//
-//    Vertex(srcVertexId, ts, propsInner, op)
-//  }
-//
-//  val uniqueId = (serviceName, columnName, id)
-//
-//  override def isAsync: Boolean = vertex.isAsync
-//
-//  override def toLogString(): String = vertex.toLogString()
-//
-//  override def queueKey: String = vertex.queueKey
-//
-//  override def queuePartitionKey: String = vertex.queuePartitionKey
-//}
 case class Vertex(id: VertexId,
                   ts: Long = System.currentTimeMillis(),
                   props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
@@ -109,22 +66,6 @@ case class Vertex(id: VertexId,
     meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte)
   } yield (meta.name -> v.toString)
 
-  //  /** only used by bulk loader */
-  //  def buildPuts(): List[Put] = {
-  //    //    logger.error(s"put: $this => $rowKey")
-  ////    val put = new Put(rowKey.bytes)
-  ////    for ((q, v) <- qualifiersWithValues) {
-  ////      put.addColumn(vertexCf, q, ts, v)
-  ////    }
-  ////    List(put)
-  //    val kv = kvs.head
-  //    val put = new Put(kv.row)
-  //    kvs.map { kv =>
-  //      put.addColumn(kv.cf, kv.qualifier, kv.timestamp, kv.value)
-  //    }
-  //    List(put)
-  //  }
-
   def toEdgeVertex() = Vertex(SourceVertexId(id.colId, innerId), ts, props, op)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
index bcd5f0a..43e5db8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
@@ -19,12 +19,10 @@
 
 package org.apache.s2graph.core.mysqls
 
-/**
- * Created by shon on 8/5/15.
- */
-
 import scalikejdbc._
 
+import scala.util.Try
+
 object Bucket extends Model[Bucket] {
 
   val rangeDelimiter = "~"
@@ -53,7 +51,7 @@ object Bucket extends Model[Bucket] {
 
   def toRange(str: String): Option[(Int, Int)] = {
     val range = str.split(rangeDelimiter)
-    if (range.length == 2) Option((range.head.toInt, range.last.toInt))
+    if (range.length == 2) Option(range.head.toInt, range.last.toInt)
     else None
   }
 
@@ -69,6 +67,24 @@ object Bucket extends Model[Bucket] {
       sql.single().apply()
     }
   }
+
+  def insert(experiment: Experiment, modular: String, httpVerb: String, 
apiPath: String,
+             requestBody: String, timeout: Int, impressionId: String,
+             isGraphQuery: Boolean, isEmpty: Boolean)
+            (implicit session: DBSession = AutoSession): Try[Bucket] = {
+    Try {
+      sql"""
+            INSERT INTO buckets(experiment_id, modular, http_verb, api_path, 
request_body, timeout, impression_id,
+             is_graph_query, is_empty)
+            VALUES (${experiment.id.get}, $modular, $httpVerb, $apiPath, 
$requestBody, $timeout, $impressionId,
+             $isGraphQuery, $isEmpty)
+        """
+        .updateAndReturnGeneratedKey().apply()
+    }.map { newId =>
+      Bucket(Some(newId.toInt), experiment.id.get, modular, httpVerb, apiPath, 
requestBody, timeout, impressionId,
+        isGraphQuery, isEmpty)
+    }
+  }
 }
 
 case class Bucket(id: Option[Int],

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
index cd825f4..0b16449 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.mysqls
 import org.apache.s2graph.core.GraphUtil
 import scalikejdbc._
 
-import scala.util.Random
+import scala.util.{Try, Random}
 
 object Experiment extends Model[Experiment] {
   val ImpressionKey = "S2-Impression-Id"
@@ -60,6 +60,17 @@ object Experiment extends Model[Experiment] {
         .map { rs => Experiment(rs) }.single.apply
     )
   }
+
+  def insert(service: Service, name: String, description: String, 
experimentType: String = "t", totalModular: Int = 100)
+            (implicit session: DBSession = AutoSession): Try[Experiment] = {
+    Try {
+      sql"""INSERT INTO experiments(service_id, service_name, `name`, 
description, experiment_type, total_modular)
+         VALUES(${service.id.get}, ${service.serviceName}, $name, 
$description, $experimentType, $totalModular)"""
+        .updateAndReturnGeneratedKey().apply()
+    }.map { newId =>
+      Experiment(Some(newId.toInt), service.id.get, name, description, 
experimentType, totalModular)
+    }
+  }
 }
 
 case class Experiment(id: Option[Int],

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index f7318f6..fdef677 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -21,13 +21,14 @@ package org.apache.s2graph.core.mysqls
 
 import java.util.Calendar
 
+import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
 import org.apache.s2graph.core.GraphUtil
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, 
InnerValLikeWithTs}
 import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.{JsObject, JsValue, Json}
+import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
 import scalikejdbc._
 
 object Label extends Model[Label] {
@@ -57,7 +58,8 @@ object Label extends Model[Label] {
       sql"""
         select *
         from labels
-        where label = ${labelName}""".map { rs => Label(rs) }.single.apply()
+        where label = ${labelName}
+        and deleted_at is null """.map { rs => Label(rs) }.single.apply()
 
     if (useCache) withCache(cacheKey)(labelOpt)
     else labelOpt
@@ -101,7 +103,8 @@ object Label extends Model[Label] {
       sql"""
         select         *
         from   labels
-        where  id = ${id}"""
+        where  id = ${id}
+        and deleted_at is null"""
         .map { rs => Label(rs) }.single.apply())
   }
 
@@ -111,7 +114,8 @@ object Label extends Model[Label] {
       sql"""
         select         *
         from   labels
-        where  id = ${id}"""
+        where  id = ${id}
+        and deleted_at is null"""
         .map { rs => Label(rs) }.single.apply()).get
   }
 
@@ -124,6 +128,7 @@ object Label extends Model[Label] {
           from labels
           where        tgt_column_name = ${col.columnName}
           and service_id = ${col.serviceId}
+          and deleted_at is null
         """.map { rs => Label(rs) }.list().apply())
   }
 
@@ -136,20 +141,21 @@ object Label extends Model[Label] {
           from labels
           where        src_column_name = ${col.columnName}
           and service_id = ${col.serviceId}
+          and deleted_at is null
         """.map { rs => Label(rs) }.list().apply())
   }
 
   def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
     val cacheKey = "srcServiceId=" + serviceId
     withCaches(cacheKey)(
-      sql"""select * from labels where src_service_id = ${serviceId}""".map { 
rs => Label(rs) }.list().apply
+      sql"""select * from labels where src_service_id = ${serviceId} and 
deleted_at is null""".map { rs => Label(rs) }.list().apply
     )
   }
 
   def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = 
AutoSession): List[Label] = {
     val cacheKey = "tgtServiceId=" + serviceId
     withCaches(cacheKey)(
-      sql"""select * from labels where tgt_service_id = ${serviceId}""".map { 
rs => Label(rs) }.list().apply
+      sql"""select * from labels where tgt_service_id = ${serviceId} and 
deleted_at is null""".map { rs => Label(rs) }.list().apply
     )
   }
 
@@ -183,14 +189,14 @@ object Label extends Model[Label] {
         val tgtServiceId = tgtService.id.get
         val serviceId = service.id.get
 
-        /* insert serviceColumn */
+        /** insert serviceColumn */
         val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, 
Some(srcColumnType), schemaVersion)
         val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, 
Some(tgtColumnType), schemaVersion)
 
         if (srcCol.columnType != srcColumnType) throw new 
RuntimeException(s"source service column type not matched ${srcCol.columnType} 
!= ${srcColumnType}")
         if (tgtCol.columnType != tgtColumnType) throw new 
RuntimeException(s"target service column type not matched ${tgtCol.columnType} 
!= ${tgtColumnType}")
 
-        /* create label */
+        /** create label */
         Label.findByName(labelName, useCache = false).getOrElse {
 
           val createdId = insert(labelName, srcServiceId, srcColumnName, 
srcColumnType,
@@ -224,7 +230,7 @@ object Label extends Model[Label] {
   }
 
   def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from labels""".map { rs => Label(rs) 
}.list().apply()
+    val ls = sql"""select * from labels where deleted_at is null""".map { rs 
=> Label(rs) }.list().apply()
     putsToCache(ls.map { x =>
       val cacheKey = s"id=${x.id.get}"
       (cacheKey -> x)
@@ -233,6 +239,7 @@ object Label extends Model[Label] {
       val cacheKey = s"label=${x.label}"
       (cacheKey -> x)
     })
+    ls
   }
 
   def updateName(oldName: String, newName: String)(implicit session: DBSession 
= AutoSession) = {
@@ -264,6 +271,21 @@ object Label extends Model[Label] {
     }
     cnt
   }
+
+  def markDeleted(label: Label)(implicit session: DBSession = AutoSession) = {
+
+    logger.info(s"mark deleted label: $label")
+    val oldName = label.label
+    val now = Calendar.getInstance().getTime
+    val newName = s"deleted_${now.getTime}_"+ label.label
+    val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} 
where id = ${label.id.get}""".update.apply()
+    val cacheKeys = List(s"id=${label.id}", s"label=${oldName}")
+    cacheKeys.foreach { key =>
+      expireCache(key)
+      expireCaches(key)
+    }
+    cnt
+  }
 }
 
 case class Label(id: Option[Int], label: String,
@@ -274,10 +296,9 @@ case class Label(id: Option[Int], label: String,
                  schemaVersion: String, isAsync: Boolean = false,
                  compressionAlgorithm: String,
                  options: Option[String]) {
+  def metas(useCache: Boolean = true) = LabelMeta.findAllByLabelId(id.get, 
useCache = useCache)
 
-  def metas = LabelMeta.findAllByLabelId(id.get)
-
-  def metaSeqsToNames = metas.map(x => (x.seq, x.name)) toMap
+  def indices(useCache: Boolean = true) = LabelIndex.findByLabelIdAll(id.get, 
useCache = useCache)
 
   //  lazy val firstHBaseTableName = 
hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
   lazy val srcService = Service.findById(srcServiceId)
@@ -356,83 +377,131 @@ case class Label(id: Option[Int], label: String,
 //    }
 //  }"""))
 
-  lazy val extraOptions: Map[String, JsValue] = options match {
-    case None => Map.empty
-    case Some(v) =>
-      try {
-        Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap 
}.getOrElse(Map.empty)
-      } catch {
-        case e: Exception =>
-          logger.error(s"An error occurs while parsing the extra label option: 
${label}", e)
-          Map.empty
-      }
+  lazy val tokens: Set[String] = 
extraOptions.get("tokens").fold(Set.empty[String]) {
+    case JsArray(tokens) => tokens.map(_.as[String]).toSet
+    case _ =>
+      logger.error("Invalid token JSON")
+      Set.empty[String]
+  }
+
+  lazy val extraOptions = Model.extraOptions(options)
+
+  lazy val durability = 
extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true)
+
+  lazy val storageConfigOpt: Option[Config] = toStorageConfig
+
+  def toStorageConfig: Option[Config] = {
+    Model.toStorageConfig(extraOptions)
   }
 
+
   def srcColumnWithDir(dir: Int) = {
-    if (dir == GraphUtil.directions("out")) srcColumn else tgtColumn
+    // GraphUtil.directions("out"
+    if (dir == 0) srcColumn else tgtColumn
   }
 
   def tgtColumnWithDir(dir: Int) = {
-    if (dir == GraphUtil.directions("out")) tgtColumn else srcColumn
+    // GraphUtil.directions("out"
+    if (dir == 0) tgtColumn else srcColumn
   }
 
-  def srcTgtColumn(dir: Int) =
-    if (isDirected) {
-      (srcColumnWithDir(dir), tgtColumnWithDir(dir))
-    } else {
-      if (dir == GraphUtil.directions("in")) {
-        (tgtColumn, srcColumn)
-      } else {
-        (srcColumn, tgtColumn)
-      }
-    }
-
-  def init() = {
-    metas
-    metaSeqsToNames
-    service
-    srcColumn
-    tgtColumn
-    defaultIndex
-    indices
-    metaProps
-  }
+  lazy val tgtSrc = (tgtColumn, srcColumn)
+  lazy val srcTgt = (srcColumn, tgtColumn)
+
+  def srcTgtColumn(dir: Int) = if (dir == 1) tgtSrc else srcTgt
+
+  lazy val EmptyPropsWithTs = Map(LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 0))
+//  def init() = {
+//    metas()
+//    metaSeqsToNames()
+//    service
+//    srcColumn
+//    tgtColumn
+//    defaultIndex
+//    indices
+//    metaProps
+//  }
+
+  //  def srcColumnInnerVal(jsValue: JsValue) = {
+  //    jsValueToInnerVal(jsValue, srcColumnType, version)
+  //  }
+  //  def tgtColumnInnerVal(jsValue: JsValue) = {
+  //    jsValueToInnerVal(jsValue, tgtColumnType, version)
+  //  }
 
   override def toString(): String = {
     val orderByKeys = LabelMeta.findAllByLabelId(id.get)
     super.toString() + orderByKeys.toString()
   }
 
-  lazy val toJson = Json.obj("labelName" -> label,
-    "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
-    "isDirected" -> isDirected,
-    "serviceName" -> serviceName,
-    "consistencyLevel" -> consistencyLevel,
-    "schemaVersion" -> schemaVersion,
-    "isAsync" -> isAsync,
-    "compressionAlgorithm" -> compressionAlgorithm,
-    "defaultIndex" -> defaultIndex.map(x => x.toJson),
-    "extraIndex" -> extraIndices.map(exIdx => exIdx.toJson),
-    "metaProps" -> metaProps.filter { labelMeta => 
LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson)
-  )
-
+  //  def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
+  //    if (scoring.isEmpty) LabelIndex.defaultSeq
+  //    else {
+  //      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
+  //
+  ////      LabelIndex.findByLabelIdAndSeqs(id.get, 
scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
+  //    }
+  //  }
+  lazy val toJson = {
+    val allIdxs = LabelIndex.findByLabelIdAll(id.get, useCache = false)
+    val defaultIdxOpt = LabelIndex.findByLabelIdAndSeq(id.get, 
LabelIndex.DefaultSeq, useCache = false)
+    val extraIdxs = allIdxs.filter(idx => defaultIdxOpt.isDefined && 
idx.id.get != defaultIdxOpt.get.id.get)
+    val metaProps = LabelMeta.reservedMetas.map { m =>
+      if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
+      else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
+      else m
+    } ::: LabelMeta.findAllByLabelId(id.get, useCache = false)
+
+    val defaultIdx = defaultIdxOpt.map(x => x.toJson).getOrElse(Json.obj())
+    val optionsJs = try {
+      val obj = options.map(Json.parse).getOrElse(Json.obj()).as[JsObject]
+      if (!obj.value.contains("tokens")) obj
+      else obj ++ Json.obj("tokens" -> 
obj.value("tokens").as[Seq[String]].map("*" * _.length))
+
+    } catch { case e: Exception => Json.obj() }
+
+    Json.obj("labelName" -> label,
+      "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
+      "isDirected" -> isDirected,
+      "serviceName" -> serviceName,
+      "consistencyLevel" -> consistencyLevel,
+      "schemaVersion" -> schemaVersion,
+      "isAsync" -> isAsync,
+      "compressionAlgorithm" -> compressionAlgorithm,
+      "defaultIndex" -> defaultIdx,
+      "extraIndex" -> extraIdxs.map(exIdx => exIdx.toJson),
+      "metaProps" -> metaProps.filter { labelMeta => 
LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson),
+      "options" -> optionsJs
+    )
+  }
 
   def propsToInnerValsWithTs(props: Map[String, Any],
-                             ts: Long = System.currentTimeMillis()): Map[Byte, 
InnerValLikeWithTs] = {
+                             ts: Long = System.currentTimeMillis()): 
Map[LabelMeta, InnerValLikeWithTs] = {
     for {
       (k, v) <- props
       labelMeta <- metaPropsInvMap.get(k)
       innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
-    } yield labelMeta.seq -> InnerValLikeWithTs(innerVal, ts)
+    } yield labelMeta -> InnerValLikeWithTs(innerVal, ts)
 
   }
 
-  def innerValsWithTsToProps(props: Map[Byte, InnerValLikeWithTs]): 
Map[String, Any] = {
-    for {
-      (k, v) <- props
-      labelMeta <- metaPropsMap.get(k)
-    } yield {
-      labelMeta.name -> v.innerVal.value
+  def innerValsWithTsToProps(props: Map[LabelMeta, InnerValLikeWithTs],
+                             selectColumns: Map[Byte, Boolean]): Map[String, 
Any] = {
+    if (selectColumns.isEmpty) {
+      for {
+        (meta, v) <- metaPropsDefaultMapInner ++ props
+      } yield {
+        meta.name -> innerValToAny(v.innerVal, meta.dataType)
+      }
+    } else {
+      for {
+        (k, _) <- selectColumns
+        if k != LabelMeta.toSeq && k != LabelMeta.fromSeq
+        labelMeta <- metaPropsMap.get(k)
+      } yield {
+        val v = 
props.get(labelMeta).orElse(metaPropsDefaultMapInner.get(labelMeta)).get
+        labelMeta.name -> innerValToAny(v.innerVal, labelMeta.dataType)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
index d7736bc..c548868 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
@@ -23,12 +23,14 @@ package org.apache.s2graph.core.mysqls
  * Created by shon on 6/3/15.
  */
 
-import play.api.libs.json.Json
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.utils.logger
+import play.api.libs.json.{JsObject, JsString, Json}
 import scalikejdbc._
 
 object LabelIndex extends Model[LabelIndex] {
   val DefaultName = "_PK"
-  val DefaultMetaSeqs = Seq(LabelMeta.timeStampSeq)
+  val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
   val DefaultSeq = 1.toByte
   val MaxOrderSeq = 7
 
@@ -144,16 +146,11 @@ object LabelIndex extends Model[LabelIndex] {
   }
 }
 
-/**
- * formular
- * ex1): w1, w2, w3
- * ex2): 1.5 * w1^2 + 3.4 * (w1 * w2), w2, w1
- */
-
 case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, 
metaSeqs: Seq[Byte], formulars: String) {
   lazy val label = Label.findById(labelId)
   lazy val metas = label.metaPropsMap
   lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => 
label.metaPropsMap.get(metaSeq))
+  lazy val sortKeyTypesArray = sortKeyTypes.toArray
   lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
   lazy val toJson = Json.obj(
     "name" -> name,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
index ef5d90a..4a7e931 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
@@ -32,40 +32,61 @@ object LabelMeta extends Model[LabelMeta] {
 
   /** dummy sequences */
 
-  val fromSeq = -4.toByte
-  val toSeq = -5.toByte
-  val lastOpSeq = -3.toByte
-  val lastDeletedAt = -2.toByte
-  val timeStampSeq = 0.toByte
+  val fromSeq = (-4).toByte
+  val toSeq = (-5).toByte
+  val lastOpSeq = (-3).toByte
+  val lastDeletedAtSeq = (-2).toByte
+  val timestampSeq = (0).toByte
+  val labelSeq = (-6).toByte
+  val directionSeq = -7.toByte
+  val fromHashSeq = -8.toByte
+
   val countSeq = (Byte.MaxValue - 2).toByte
   val degreeSeq = (Byte.MaxValue - 1).toByte
   val maxValue = Byte.MaxValue
-  val emptyValue = Byte.MaxValue
+  val emptySeq = Byte.MaxValue
 
   /** reserved sequences */
   //  val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = 
lastDeletedAt, name = "lastDeletedAt",
   //    seq = lastDeletedAt, defaultValue = "", dataType = "long")
+  val fromHash = LabelMeta(id = None, labelId = fromHashSeq, name = 
"_from_hash",
+    seq = fromHashSeq, defaultValue = fromHashSeq.toString, dataType = "long")
   val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from",
-    seq = fromSeq, defaultValue = fromSeq.toString, dataType = "long")
+    seq = fromSeq, defaultValue = fromSeq.toString, dataType = "string")
   val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to",
-    seq = toSeq, defaultValue = toSeq.toString, dataType = "long")
+    seq = toSeq, defaultValue = toSeq.toString, dataType = "string")
   val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp",
-    seq = timeStampSeq, defaultValue = "0", dataType = "long")
+    seq = timestampSeq, defaultValue = "0", dataType = "long")
   val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree",
     seq = degreeSeq, defaultValue = "0", dataType = "long")
   val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count",
     seq = countSeq, defaultValue = "-1", dataType = "long")
+  val lastDeletedAt = LabelMeta(id = Some(-1), labelId = -1, name = 
"_lastDeletedAt",
+    seq = lastDeletedAtSeq, defaultValue = "-1", dataType = "long")
+  val label = LabelMeta(id = Some(-1), labelId = -1, name = "label",
+    seq = labelSeq, defaultValue = "", dataType = "string")
+  val direction = LabelMeta(id = Some(-1), labelId = -1, name = "direction",
+    seq = directionSeq, defaultValue = "out", dataType = "string")
+  val empty = LabelMeta(id = Some(-1), labelId = -1, name = "_empty",
+    seq = emptySeq, defaultValue = "-1", dataType = "long")
 
   // Each reserved column(_timestamp, timestamp) has same seq number, starts 
with '_' has high priority
-  val reservedMetas = List(from, to, degree, timestamp, count).flatMap { lm => 
List(lm, lm.copy(name = lm.name.drop(1))) }.reverse
-  val reservedMetasInner = List(from, to, degree, timestamp, count)
+  val reservedMetas = List(empty, label, direction, lastDeletedAt, from, 
fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = 
lm.name.drop(1))) }.reverse
+  val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, 
fromHash, to, degree, timestamp, count)
+
+  val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", 
"_from_hash", "label", "direction", "timestamp", "_timestamp")
 
   def apply(rs: WrappedResultSet): LabelMeta = {
     LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase)
   }
 
-  def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq
-  def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq
+  /** Note: DegreeSeq should not be included in serializer/deserializer.
+    * only 0 <= seq <= CountSeq(Int.MaxValue - 2), not DegreeSet(Int.MaxValue 
- 1) should be
+    * included in actual bytes in storage.
+    * */
+  def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq // || seq 
== fromHashSeq
+
+  def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || 
seq == fromHashSeq
 
   def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta 
= {
     val cacheKey = "id=" + id
@@ -164,6 +185,21 @@ object LabelMeta extends Model[LabelMeta] {
   }
 }
 
-case class LabelMeta(id: Option[Int], labelId: Int, name: String, seq: Byte, 
defaultValue: String, dataType: String) {
+case class LabelMeta(id: Option[Int],
+                     labelId: Int,
+                     name: String,
+                     seq: Byte,
+                     defaultValue: String,
+                     dataType: String) {
   lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, 
"dataType" -> dataType)
+  override def equals(other: Any): Boolean = {
+    if (!other.isInstanceOf[LabelMeta]) false
+    else {
+      val o = other.asInstanceOf[LabelMeta]
+//      labelId == o.labelId &&
+        seq == o.seq
+    }
+  }
+  override def hashCode(): Int = seq.toInt
+//    (labelId, seq).hashCode()
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
index bf6dfb7..7a18a49 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
@@ -21,8 +21,10 @@ package org.apache.s2graph.core.mysqls
 
 import java.util.concurrent.Executors
 
-import com.typesafe.config.Config
+import com.typesafe.config.{ConfigFactory, Config}
+import org.apache.s2graph.core.JSONParser
 import org.apache.s2graph.core.utils.{SafeUpdateCache, logger}
+import play.api.libs.json.{Json, JsObject, JsValue}
 import scalikejdbc._
 
 import scala.concurrent.ExecutionContext
@@ -36,6 +38,7 @@ object Model {
   val numOfThread = Runtime.getRuntime.availableProcessors()
   val threadPool = Executors.newFixedThreadPool(numOfThread)
   val ec = ExecutionContext.fromExecutor(threadPool)
+  val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8"
 
   def apply(config: Config) = {
     maxSize = config.getInt("cache.max.size")
@@ -116,6 +119,34 @@ object Model {
     LabelIndex.findAll()
     ColumnMeta.findAll()
   }
+
+  def extraOptions(options: Option[String]): Map[String, JsValue] = options 
match {
+    case None => Map.empty
+    case Some(v) =>
+      try {
+        Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap 
}.getOrElse(Map.empty)
+      } catch {
+        case e: Exception =>
+          logger.error(s"An error occurs while parsing the extra label 
option", e)
+          Map.empty
+      }
+  }
+
+  def toStorageConfig(options: Map[String, JsValue]): Option[Config] = {
+    try {
+      options.get("storage").map { jsValue =>
+        import scala.collection.JavaConverters._
+        val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, 
value) =>
+          key -> JSONParser.jsValueToAny(value).getOrElse(throw new 
RuntimeException("!!"))
+        }
+        ConfigFactory.parseMap(configMap.asJava)
+      }
+    } catch {
+      case e: Exception =>
+        logger.error(s"toStorageConfig error. use default storage", e)
+        None
+    }
+  }
 }
 
 trait Model[V] extends SQLSyntaxSupport[V] {
@@ -145,5 +176,9 @@ trait Model[V] extends SQLSyntaxSupport[V] {
   def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach {
     case (key, values) => listCache.put(key, values)
   }
+
+  def getAllCacheData() : (List[(String, Option[_])], List[(String, List[_])]) 
= {
+    (optionCache.getAllData(), listCache.getAllData())
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
index 3e01014..7fdda45 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core.mysqls
 
 import java.util.UUID
 
+import com.typesafe.config.Config
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.Json
 import scalikejdbc._
@@ -94,6 +95,7 @@ object Service extends Model[Service] {
       val cacheKey = s"serviceName=${x.serviceName}"
       (cacheKey -> x)
     })
+    ls
   }
 
   def findAllConn()(implicit session: DBSession = AutoSession): List[String] = 
{
@@ -101,7 +103,14 @@ object Service extends Model[Service] {
   }
 }
 
-case class Service(id: Option[Int], serviceName: String, accessToken: String, 
cluster: String, hTableName: String, preSplitSize: Int, hTableTTL: Option[Int]) 
{
+case class Service(id: Option[Int],
+                   serviceName: String,
+                   accessToken: String,
+                   cluster: String,
+                   hTableName: String,
+                   preSplitSize: Int,
+                   hTableTTL: Option[Int],
+                   options: Option[String] = None) {
   lazy val toJson =
     id match {
       case Some(_id) =>
@@ -110,4 +119,8 @@ case class Service(id: Option[Int], serviceName: String, 
accessToken: String, cl
       case None =>
         Json.parse("{}")
     }
+
+  lazy val extraOptions = Model.extraOptions(options)
+  lazy val storageConfigOpt: Option[Config] = toStorageConfig
+  def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions)
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 1c93667..effb94b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -19,11 +19,13 @@
 
 package org.apache.s2graph.core.parsers
 
-import org.apache.s2graph.core.GraphExceptions.WhereParserException
+import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, 
WhereParserException}
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
 import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.Edge
+import org.apache.s2graph.core.{Edge, GraphUtil}
 import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.utils.logger
+
 import scala.annotation.tailrec
 import scala.util.Try
 import scala.util.parsing.combinator.JavaTokenParsers
@@ -37,12 +39,11 @@ trait ExtractValue {
     val label = parentEdge.label
     val metaPropInvMap = label.metaPropsInvMap
     val labelMeta = metaPropInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
-    val metaSeq = labelMeta.seq
 
-    metaSeq match {
-      case LabelMeta.from.seq => parentEdge.srcVertex.innerId
-      case LabelMeta.to.seq => parentEdge.tgtVertex.innerId
-      case _ => parentEdge.propsWithTs.get(metaSeq) match {
+    labelMeta match {
+      case LabelMeta.from => parentEdge.srcVertex.innerId
+      case LabelMeta.to => parentEdge.tgtVertex.innerId
+      case _ => parentEdge.propsWithTs.get(labelMeta) match {
         case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, 
label.schemaVersion)
         case Some(edgeVal) => edgeVal.innerVal
       }
@@ -99,7 +100,13 @@ trait Clause extends ExtractValue {
     binOp(propValue, compValue)
   }
 }
-
+object Where {
+  def apply(labelName: String, sql: String): Try[Where] = {
+    val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
+    val parser = new WhereParser(label)
+    parser.parse(sql)
+  }
+}
 case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) {
   def filter(edge: Edge) =
     if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity)
@@ -118,18 +125,36 @@ case class Eq(propKey: String, value: String) extends 
Clause {
 }
 
 case class InWithoutParent(label: Label, propKey: String, values: Set[String]) 
extends Clause {
-  val innerValLikeLs = values.map { value =>
+  lazy val innerValLikeLsOut = values.map { value =>
     val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
     val dataType = propKey match {
       case "_to" | "to" => label.tgtColumn.columnType
       case "_from" | "from" => label.srcColumn.columnType
       case _ => labelMeta.dataType
     }
+
+    toInnerVal(value, dataType, label.schemaVersion)
+  }
+
+  lazy val innerValLikeLsIn = values.map { value =>
+    val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
+    val dataType = propKey match {
+      case "_to" | "to" => label.srcColumn.columnType
+      case "_from" | "from" => label.tgtColumn.columnType
+      case _ => labelMeta.dataType
+    }
+
     toInnerVal(value, dataType, label.schemaVersion)
   }
+
   override def filter(edge: Edge): Boolean = {
-    val propVal = propToInnerVal(edge, propKey)
-    innerValLikeLs.contains(propVal)
+    if (edge.dir == GraphUtil.directions("in")) {
+      val propVal = propToInnerVal(edge, propKey)
+      innerValLikeLsIn.contains(propVal)
+    } else {
+      val propVal = propToInnerVal(edge, propKey)
+      innerValLikeLsOut.contains(propVal)
+    }
   }
 }
 
@@ -166,11 +191,15 @@ case class Or(left: Clause, right: Clause) extends Clause 
{
 
 object WhereParser {
   val success = Where()
+
 }
 
 case class WhereParser(label: Label) extends JavaTokenParsers {
 
-  val anyStr = "[^\\s(),]+".r
+
+  override val stringLiteral = (("'" ~> "(\\\\'|[^'])*".r <~ "'" ) ^^ 
(_.replace("\\'", "'"))) | anyStr
+
+  val anyStr = "[^\\s(),']+".r
 
   val and = "and|AND".r
 
@@ -190,33 +219,35 @@ case class WhereParser(label: Label) extends 
JavaTokenParsers {
 
   def identWithDot: Parser[String] = repsep(ident, ".") ^^ { case values => 
values.mkString(".") }
 
-  def predicate = {
-    identWithDot ~ ("!=" | "=") ~ anyStr ^^ {
-      case f ~ op ~ s =>
-        if (op == "=") Eq(f, s)
-        else Not(Eq(f, s))
-    } | identWithDot ~ (">=" | "<=" | ">" | "<") ~ anyStr ^^ {
-      case f ~ op ~ s => op match {
-        case ">" => Gt(f, s)
-        case ">=" => Or(Gt(f, s), Eq(f, s))
-        case "<" => Lt(f, s)
-        case "<=" => Or(Lt(f, s), Eq(f, s))
-      }
-    } | identWithDot ~ (between ~> anyStr <~ and) ~ anyStr ^^ {
-      case f ~ minV ~ maxV => Between(f, minV, maxV)
-    } | identWithDot ~ (notIn | in) ~ ("(" ~> repsep(anyStr, ",") <~ ")") ^^ {
-      case f ~ op ~ values =>
-        val inClause =
-          if (f.startsWith("_parent")) IN(f, values.toSet)
-          else InWithoutParent(label, f, values.toSet)
-        if (op.toLowerCase == "in") inClause
-        else Not(inClause)
-
-
-      case _ => throw WhereParserException(s"Failed to parse where clause. ")
+  val _eq = identWithDot ~ ("!=" | "=") ~ stringLiteral ^^ {
+    case f ~ op ~ s => if (op == "=") Eq(f, s) else Not(Eq(f, s))
+  }
+
+  val _ltGt = identWithDot ~ (">=" | "<=" | ">" | "<") ~ stringLiteral ^^ {
+    case f ~ op ~ s => op match {
+      case ">" => Gt(f, s)
+      case ">=" => Or(Gt(f, s), Eq(f, s))
+      case "<" => Lt(f, s)
+      case "<=" => Or(Lt(f, s), Eq(f, s))
     }
   }
 
+  val _between = identWithDot ~ (between ~> stringLiteral <~ and) ~ 
stringLiteral ^^ {
+    case f ~ minV ~ maxV => Between(f, minV, maxV)
+  }
+
+  val _in = identWithDot ~ (notIn | in) ~ ("(" ~> repsep(stringLiteral, ",") 
<~ ")") ^^ {
+    case f ~ op ~ values =>
+      val inClause =
+        if (f.startsWith("_parent")) IN(f, values.toSet)
+        else InWithoutParent(label, f, values.toSet)
+
+      if (op.toLowerCase == "in") inClause
+      else Not(inClause)
+  }
+
+  def predicate =  _eq | _ltGt | _between | _in
+
   def parse(sql: String): Try[Where] = Try {
     parseAll(where, sql) match {
       case Success(r, q) => r

Reply via email to