http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index d77ac7d..5466a9a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -19,57 +19,82 @@
 
 package org.apache.s2graph.core.rest
 
+
 import java.util.concurrent.{Callable, TimeUnit}
+
 import com.google.common.cache.CacheBuilder
-import com.typesafe.config.Config
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException, 
ModelNotFoundException}
+import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.JSONParser._
 import play.api.libs.json._
-import play.api.libs.json.Reads._
 
-import scala.util.{Failure, Success, Try}
+
+import scala.util.{Random, Failure, Success, Try}
 
 object TemplateHelper {
   val findVar = """\"?\$\{(.*?)\}\"?""".r
-  val num = 
"""(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
+  val num = 
"""(next_minute|next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(minute|hour|day|week)?""".r
+  val randIntRegex = """randint\((.*,.*)\)""".r
 
-  val hour = 60 * 60 * 1000L
-  val day = hour * 24L
-  val week = day * 7L
+  val minute: Long = 60 * 1000L
+  val hour = 60 * minute
+  val day = 24 * hour
+  val week = 7 * day
 
   def calculate(now: Long, n: Int, unit: String): Long = {
     val duration = unit match {
+      case "minute" | "MINUTE" => n * minute
       case "hour" | "HOUR" => n * hour
       case "day" | "DAY" => n * day
       case "week" | "WEEK" => n * week
-      case _ => n * day
+      case _ => n
     }
 
     duration + now
   }
 
+  def randInt(s: String): Long = {
+    val tokens = s.split(",").map(_.trim)
+    if (tokens.length != 2) throw new 
RuntimeException(s"TemplateHelper.randint has wrong format. $s")
+    val (from, to) = try {
+      (tokens.head.toInt, tokens.last.toInt)
+    } catch {
+      case e: Exception => throw new RuntimeException(s"TemplateHelper.randint 
has wrong format. $s")
+    }
+    if (from > to) throw new RuntimeException(s"TemplateHelper.randint has 
wrong format. $s")
+    val diff = to - from
+    val r = Random.nextInt(diff + 1)
+    assert(diff >= 0 && diff < Int.MaxValue && from + r < Int.MaxValue)
+    from + r
+  }
+
   def replaceVariable(now: Long, body: String): String = {
     findVar.replaceAllIn(body, m => {
       val matched = m group 1
+      randIntRegex.findFirstMatchIn(matched) match {
+        case None =>
+          num.replaceSomeIn(matched, m => {
+            val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3))
+            val ts = _pivot match {
+              case null => now
+              case "now" | "NOW" => now
+              case "next_minute" | "NEXT_MINUTE" => now / minute * minute + 
minute
+              case "next_week" | "NEXT_WEEK" => now / week * week + week
+              case "next_day" | "NEXT_DAY" => now / day * day + day
+              case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour
+            }
 
-      num.replaceSomeIn(matched, m => {
-        val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3))
-        val ts = _pivot match {
-          case null => now
-          case "now" | "NOW" => now
-          case "next_week" | "NEXT_WEEK" => now / week * week + week
-          case "next_day" | "NEXT_DAY" => now / day * day + day
-          case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour
-        }
-
-        if (_pivot == null && n == null && unit == null) None
-        else if (n == null || unit == null) Option(ts.toString)
-        else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString)
-      })
+            if (_pivot == null && n == null && unit == null) None
+            else if (n == null || unit == null) Option(ts.toString)
+            else Option(calculate(ts, n.replaceAll(" ", "").toInt, 
unit).toString)
+          })
+        case Some(m) =>
+          val range = m group 1
+          randInt(range).toString
+      }
     })
   }
 }
@@ -107,37 +132,37 @@ class RequestParser(graph: Graph) {
   val DefaultCompressionAlgorithm = 
config.getString("hbase.table.compression.algorithm")
   val DefaultPhase = config.getString("phase")
   val parserCache = CacheBuilder.newBuilder()
-    .expireAfterAccess(10000, TimeUnit.MILLISECONDS)
-    .expireAfterWrite(10000, TimeUnit.MILLISECONDS)
-    .maximumSize(10000)
-    .initialCapacity(1000)
-    .build[String, Try[Where]]
+      .expireAfterAccess(10000, TimeUnit.MILLISECONDS)
+      .expireAfterWrite(10000, TimeUnit.MILLISECONDS)
+      .maximumSize(10000)
+      .initialCapacity(1000)
+      .build[String, Try[Where]]
 
-  private def extractScoring(labelId: Int, value: JsValue) = {
+  private def extractScoring(label: Label, value: JsValue): 
Option[Seq[(LabelMeta, Double)]] = {
     val ret = for {
       js <- parseOption[JsObject](value, "scoring")
     } yield {
       for {
         (k, v) <- js.fields
-        labelOrderType <- LabelMeta.findByName(labelId, k)
+        labelMata <- label.metaPropsInvMap.get(k)
       } yield {
         val value = v match {
           case n: JsNumber => n.as[Double]
           case _ => throw new Exception("scoring weight should be double.")
         }
-        (labelOrderType.seq, value)
+        (labelMata, value)
       }
     }
 
     ret
   }
 
-  def extractInterval(label: Label, jsValue: JsValue) = {
-    def extractKv(js: JsValue) = js match {
-      case JsObject(map) => map.toSeq
+  def extractInterval(label: Label, jsValue: JsValue): Option[(Seq[(String, 
JsValue)], Seq[(String, JsValue)])] = {
+    def extractKv(js: JsValue): Seq[(String, JsValue)] = js match {
+      case JsObject(obj) => obj.toSeq
       case JsArray(arr) => arr.flatMap {
-        case JsObject(map) => map.toSeq
-        case _ => throw new RuntimeException(s"cannot support json type: $js")
+        case JsObject(obj) => obj.toSeq
+        case _ => throw new RuntimeException(s"cannot support json type $js")
       }
       case _ => throw new RuntimeException(s"cannot support json type: $js")
     }
@@ -147,8 +172,8 @@ class RequestParser(graph: Graph) {
       fromJs <- (js \ "from").asOpt[JsValue]
       toJs <- (js \ "to").asOpt[JsValue]
     } yield {
-      val from = Management.toProps(label, extractKv(fromJs))
-      val to = Management.toProps(label, extractKv(toJs))
+      val from = extractKv(fromJs)
+      val to = extractKv(toJs)
       (from, to)
     }
 
@@ -188,10 +213,10 @@ class RequestParser(graph: Graph) {
         labelMeta <- LabelMeta.findByName(label.id.get, k)
         value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion)
       } yield {
-        labelMeta.seq -> value
+        labelMeta.name -> value
       }
     }
-    ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
+    ret.map(_.toMap).getOrElse(Map.empty[String, InnerValLike])
   }
 
   def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = 
{
@@ -214,6 +239,28 @@ class RequestParser(graph: Graph) {
     }
   }
 
+  def extractGroupBy(value: Option[JsValue]): GroupBy = value.map {
+    case obj: JsObject =>
+      val keys = (obj \ "keys").asOpt[Seq[String]].getOrElse(Nil)
+      val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit)
+      val minShouldMatchOpt = (obj \ "minimumShouldMatch").asOpt[JsObject].map 
{ o =>
+        val prop = (o \ "prop").asOpt[String].getOrElse("to")
+        val count = (o \ "count").asOpt[Int].getOrElse(0)
+        val terms = (o \ "terms").asOpt[Set[JsValue]].getOrElse(Set.empty).map 
{
+          case JsString(s) => s
+          case JsNumber(n) => n
+          case _ => throw new RuntimeException("not supported data type")
+        }.map(_.asInstanceOf[Any])
+
+        MinShouldMatchParam(prop, count, terms)
+      }
+
+      GroupBy(keys, groupByLimit, minShouldMatch = minShouldMatchOpt)
+    case arr: JsArray =>
+      val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
+      GroupBy(keys)
+  }.getOrElse(GroupBy.Empty)
+
   def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): 
Seq[Vertex] = {
     val vertices = for {
       label <- Label.findByName(labelName).toSeq
@@ -228,33 +275,40 @@ class RequestParser(graph: Graph) {
   }
 
   def toMultiQuery(jsValue: JsValue, impIdOpt: Option[String]): MultiQuery = {
+    val globalQueryOption = toQueryOption(jsValue, impIdOpt)
     val queries = for {
       queryJson <- (jsValue \ 
"queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
     } yield {
-      toQuery(queryJson, impIdOpt = impIdOpt)
+      val innerQuery = toQuery(queryJson, impIdOpt = impIdOpt)
+      val queryOption = innerQuery.queryOption
+
+      if (queryOption.groupBy.keys.nonEmpty) throw new 
BadQueryException("Group by option is not allowed in multiple queries.")
+      if (queryOption.orderByKeys.nonEmpty) throw new BadQueryException("Order 
by option is not allowed in multiple queries.")
+
+      if (globalQueryOption.withScore) innerQuery.copy(queryOption = 
innerQuery.queryOption.copy(withScore = false))
+      else innerQuery
+      //        val innerQuery3 =
+      //          if (globalQueryOption.groupBy.keys.nonEmpty) 
innerQuery2.copy(queryOption = innerQuery2.queryOption.copy(groupBy = 
GroupBy.Empty))
+      //          else innerQuery2
+
     }
     val weights = (jsValue \ 
"weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
-    MultiQuery(queries = queries, weights = weights, queryOption = 
toQueryOption(jsValue, impIdOpt))
+    MultiQuery(queries = queries, weights = weights, queryOption = 
globalQueryOption)
   }
 
+
   def toQueryOption(jsValue: JsValue, impIdOpt: Option[String]): QueryOption = 
{
     val filterOutFields = (jsValue \ 
"filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
-    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => 
toQuery(v, impIdOpt = impIdOpt) }.map { q =>
-      q.copy(queryOption = q.queryOption.copy(filterOutFields = 
filterOutFields))
+    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v =>
+      toQuery(v, impIdOpt = impIdOpt)
+    }.map { q =>
+      q.copy(queryOption = q.queryOption.copy(filterOutFields = 
filterOutFields, selectColumns = filterOutFields))
     }
     val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
     val selectColumns = (jsValue \ 
"select").asOpt[List[String]].getOrElse(List.empty)
-//    val groupByColumns = (jsValue \ 
"groupBy").asOpt[List[String]].getOrElse(List.empty)
-    val groupBy = (jsValue \ "groupBy").asOpt[JsValue].getOrElse(JsNull) match 
{
-      case obj: JsObject =>
-        val keys = (obj \ "key").asOpt[Seq[String]].getOrElse(Nil)
-        val groupByLimit = (obj \ "limit").asOpt[Int].getOrElse(hardLimit)
-        GroupBy(keys, groupByLimit)
-      case arr: JsArray =>
-        val keys = arr.asOpt[Seq[String]].getOrElse(Nil)
-        GroupBy(keys)
-      case _ => GroupBy.Empty
-    }
+
+    val groupBy = extractGroupBy((jsValue \ "groupBy").asOpt[JsValue])
+
     val orderByColumns: List[(String, Boolean)] = (jsValue \ 
"orderBy").asOpt[List[JsObject]].map { jsLs =>
       for {
         js <- jsLs
@@ -266,7 +320,7 @@ class RequestParser(graph: Graph) {
         }
         column -> ascending
       }
-    }.getOrElse(List("score" -> false, "timestamp" -> false))
+    }.getOrElse(Nil)
     val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true)
     val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false)
     //TODO: Refactor this
@@ -274,6 +328,8 @@ class RequestParser(graph: Graph) {
     val returnAgg = (jsValue \ "returnAgg").asOpt[Boolean].getOrElse(true)
     val scoreThreshold = (jsValue \ 
"scoreThreshold").asOpt[Double].getOrElse(Double.MinValue)
     val returnDegree = (jsValue \ 
"returnDegree").asOpt[Boolean].getOrElse(true)
+    val ignorePrevStepCache = (jsValue \ 
"ignorePrevStepCache").asOpt[Boolean].getOrElse(false)
+    val shouldPropagateScore = (jsValue \ 
"shouldPropagateScore").asOpt[Boolean].getOrElse(true)
 
     QueryOption(removeCycle = removeCycle,
       selectColumns = selectColumns,
@@ -287,34 +343,25 @@ class RequestParser(graph: Graph) {
       returnAgg = returnAgg,
       scoreThreshold = scoreThreshold,
       returnDegree = returnDegree,
-      impIdOpt = impIdOpt
+      impIdOpt = impIdOpt,
+      ignorePrevStepCache,
+      shouldPropagateScore
     )
   }
 
   def toQuery(jsValue: JsValue, impIdOpt: Option[String]): Query = {
     try {
-      val vertices =
-        (for {
-          value <- parse[List[JsValue]](jsValue, "srcVertices")
-          serviceName = parse[String](value, "serviceName")
-          column = parse[String](value, "columnName")
-        } yield {
-          val service = Service.findByName(serviceName).getOrElse(throw 
BadQueryException("service not found"))
-          val col = ServiceColumn.find(service.id.get, column).getOrElse(throw 
BadQueryException("bad column name"))
-          val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ 
"ids").asOpt[List[JsValue]])
-          for {
-            idVal <- idOpt ++ idsOpt.toSeq.flatten
-
-            /* bug, need to use labels schemaVersion  */
-            innerVal <- jsValueToInnerVal(idVal, col.columnType, 
col.schemaVersion)
-          } yield {
-            Vertex(SourceVertexId(col.id.get, innerVal), 
System.currentTimeMillis())
-          }
-        }).flatten
+      val vertices = for {
+        value <- (jsValue \ "srcVertices").asOpt[Seq[JsValue]].getOrElse(Nil)
+        serviceName <- (value \ "serviceName").asOpt[String].toSeq
+        columnName <- (value \ "columnName").asOpt[String].toSeq
+        idJson = (value \ "id").asOpt[JsValue].map(Seq(_)).getOrElse(Nil)
+        idsJson = (value \ "ids").asOpt[Seq[JsValue]].getOrElse(Nil)
+        id <- (idJson ++ idsJson).flatMap(jsValueToAny(_).toSeq).distinct
+      } yield Vertex.toVertex(serviceName, columnName, id)
 
       if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is 
empty")
       val steps = parse[Vector[JsValue]](jsValue, "steps")
-
       val queryOption = toQueryOption(jsValue, impIdOpt)
 
       val querySteps =
@@ -350,32 +397,35 @@ class RequestParser(graph: Graph) {
           val queryParams =
             for {
               labelGroup <- queryParamJsVals
-              queryParam <- parseQueryParam(labelGroup)
+              queryParam <- parseQueryParam(labelGroup, queryOption)
             } yield {
               val (_, columnName) =
-                if (queryParam.labelWithDir.dir == 
GraphUtil.directions("out")) {
+                if (queryParam.dir == GraphUtil.directions("out")) {
                   (queryParam.label.srcService.serviceName, 
queryParam.label.srcColumnName)
                 } else {
                   (queryParam.label.tgtService.serviceName, 
queryParam.label.tgtColumnName)
                 }
               //FIXME:
-              if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => 
v.serviceColumn.columnName == columnName)) {
+              if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => 
v.columnName == columnName)) {
                 throw BadQueryException("srcVertices contains incompatiable 
serviceName or columnName with first step.")
               }
 
               queryParam
             }
-          Step(queryParams.toList, labelWeights = labelWeights,
-            //            scoreThreshold = stepThreshold,
+
+
+          val groupBy = extractGroupBy((step \ "groupBy").asOpt[JsValue])
+
+          Step(queryParams = queryParams,
+            labelWeights = labelWeights,
             nextStepScoreThreshold = nextStepScoreThreshold,
             nextStepLimit = nextStepLimit,
-            cacheTTL = cacheTTL)
+            cacheTTL = cacheTTL,
+            groupBy = groupBy)
 
         }
 
-      val ret = Query(vertices, querySteps, queryOption)
-      //      logger.debug(ret.toString)
-      ret
+      Query(vertices, querySteps, queryOption)
     } catch {
       case e: BadQueryException =>
         throw e
@@ -386,12 +436,12 @@ class RequestParser(graph: Graph) {
     }
   }
 
-  private def parseQueryParam(labelGroup: JsValue): Option[QueryParam] = {
+  private def parseQueryParam(labelGroup: JsValue, queryOption: QueryOption): 
Option[QueryParam] = {
     for {
       labelName <- parseOption[String](labelGroup, "label")
     } yield {
       val label = Label.findByName(labelName).getOrElse(throw 
BadQueryException(s"$labelName not found"))
-      val direction = parseOption[String](labelGroup, 
"direction").map(GraphUtil.toDirection(_)).getOrElse(0)
+      val direction = parseOption[String](labelGroup, 
"direction").getOrElse("out")
       val limit = {
         parseOption[Int](labelGroup, "limit") match {
           case None => defaultLimit
@@ -402,28 +452,24 @@ class RequestParser(graph: Graph) {
       val offset = parseOption[Int](labelGroup, "offset").getOrElse(0)
       val interval = extractInterval(label, labelGroup)
       val duration = extractDuration(label, labelGroup)
-      val scoring = extractScoring(label.id.get, 
labelGroup).getOrElse(List.empty[(Byte, Double)]).toList
+      val scoring = extractScoring(label, labelGroup).getOrElse(Nil).toList
       val exclude = parseOption[Boolean](labelGroup, 
"exclude").getOrElse(false)
       val include = parseOption[Boolean](labelGroup, 
"include").getOrElse(false)
       val hasFilter = extractHas(label, labelGroup)
-      val labelWithDir = LabelWithDirection(label.id.get, direction)
-      val indexNameOpt = (labelGroup \ "index").asOpt[String]
-      val indexSeq = indexNameOpt match {
-        case None => label.indexSeqsMap.get(scoring.map(kv => 
kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq)
-        case Some(indexName) => 
label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new 
RuntimeException("cannot find index"))
-      }
+
+      val indexName = (labelGroup \ 
"index").asOpt[String].getOrElse(LabelIndex.DefaultName)
       val whereClauseOpt = (labelGroup \ "where").asOpt[String]
       val where = extractWhere(label, whereClauseOpt)
       val includeDegree = (labelGroup \ 
"includeDegree").asOpt[Boolean].getOrElse(true)
       val rpcTimeout = (labelGroup \ 
"rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout)
       val maxAttempt = (labelGroup \ 
"maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt)
-      val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { 
jsVal =>
-        jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, 
label.schemaVersion)
-      }
+
+      val tgtVertexInnerIdOpt = (labelGroup \ 
"_to").asOpt[JsValue].filterNot(_ == JsNull).flatMap(jsValueToAny)
+
       val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L)
       val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { 
jsVal =>
         val propName = (jsVal \ 
"propName").asOpt[String].getOrElse(LabelMeta.timestamp.name)
-        val propNameSeq = 
label.metaPropsInvMap.get(propName).map(_.seq).getOrElse(LabelMeta.timeStampSeq)
+        val propNameSeq = 
label.metaPropsInvMap.get(propName).getOrElse(LabelMeta.timestamp)
         val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0)
         val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1)
         if (decayRate >= 1.0 || decayRate <= 0.0) throw new 
BadQueryException("decay rate should be 0.0 ~ 1.0")
@@ -432,40 +478,36 @@ class RequestParser(graph: Graph) {
       }
       val threshold = (labelGroup \ 
"threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
       // TODO: refactor this. dirty
-      val duplicate = parseOption[String](labelGroup, "duplicate").map(s => 
Query.DuplicatePolicy(s))
+      val duplicate = parseOption[String](labelGroup, "duplicate").map(s => 
DuplicatePolicy(s)).getOrElse(DuplicatePolicy.First)
 
       val outputField = (labelGroup \ "outputField").asOpt[String].map(s => 
Json.arr(Json.arr(s)))
-      val transformer = if (outputField.isDefined) outputField else 
(labelGroup \ "transform").asOpt[JsValue]
+      val transformer = (if (outputField.isDefined) outputField else 
(labelGroup \ "transform").asOpt[JsValue]) match {
+        case None => EdgeTransformer(EdgeTransformer.DefaultJson)
+        case Some(json) => EdgeTransformer(json)
+      }
       val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
       val scorePropagateShrinkage = (labelGroup \ 
"scorePropagateShrinkage").asOpt[Long].getOrElse(500l)
       val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
       val shouldNormalize = (labelGroup \ 
"normalize").asOpt[Boolean].getOrElse(false)
       val cursorOpt = (labelGroup \ "cursor").asOpt[String]
       // FIXME: Order of command matter
-      QueryParam(labelWithDir)
-        .sample(sample)
-        .rank(RankParam(label.id.get, scoring))
-        .exclude(exclude)
-        .include(include)
-        .duration(duration)
-        .has(hasFilter)
-        .labelOrderSeq(indexSeq)
-        .interval(interval)
-        .limit(offset, limit)
-        .where(where)
-        .duplicatePolicy(duplicate)
-        .includeDegree(includeDegree)
-        .rpcTimeout(rpcTimeout)
-        .maxAttempt(maxAttempt)
-        .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt)
-        .cacheTTLInMillis(cacheTTL)
-        .timeDecay(timeDecayFactor)
-        .threshold(threshold)
-        .transformer(transformer)
-        .scorePropagateOp(scorePropagateOp)
-        .scorePropagateShrinkage(scorePropagateShrinkage)
-        .shouldNormalize(shouldNormalize)
-        .cursorOpt(cursorOpt)
+      QueryParam(labelName = labelName,
+        direction = direction,
+        offset = offset,
+        limit = limit,
+        sample = sample,
+        maxAttempt = maxAttempt,
+        rpcTimeout = rpcTimeout,
+        cacheTTLInMillis = cacheTTL,
+        indexName = indexName, where = where, threshold = threshold,
+        rank = RankParam(scoring), intervalOpt = interval, durationOpt = 
duration,
+        exclude = exclude, include = include, has = hasFilter, duplicatePolicy 
= duplicate,
+        includeDegree = includeDegree, scorePropagateShrinkage = 
scorePropagateShrinkage,
+        scorePropagateOp = scorePropagateOp, shouldNormalize = shouldNormalize,
+        whereRawOpt = whereClauseOpt, cursorOpt = cursorOpt,
+        tgtVertexIdOpt = tgtVertexInnerIdOpt,
+        edgeTransformer = transformer, timeDecay = timeDecayFactor
+      )
     }
   }
 
@@ -489,6 +531,11 @@ class RequestParser(graph: Graph) {
     }
   }
 
+  def jsToStr(js: JsValue): String = js match {
+    case JsString(s) => s
+    case _ => js.toString()
+  }
+
   def parseBulkFormat(str: String): Seq[(GraphElement, String)] = {
     val edgeStrs = str.split("\\n").filterNot(_.isEmpty)
     val elementsWithTsv = for {
@@ -624,30 +671,15 @@ class RequestParser(graph: Graph) {
   }
 
   def toCheckEdgeParam(jsValue: JsValue) = {
-    val params = jsValue.as[List[JsValue]]
-    var isReverted = false
-    val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]()
-    val quads = for {
-      param <- params
-      labelName <- (param \ "label").asOpt[String]
-      direction <- GraphUtil.toDir((param \ 
"direction").asOpt[String].getOrElse("out"))
-      label <- Label.findByName(labelName)
-      srcId <- jsValueToInnerVal((param \ "from").as[JsValue], 
label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion)
-      tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], 
label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion)
+    for {
+      json <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil)
+      from <- (json \ "from").asOpt[JsValue].flatMap(jsValueToAny(_))
+      to <- (json \ "to").asOpt[JsValue].flatMap(jsValueToAny(_))
+      labelName <- (json \ "label").asOpt[String]
+      direction = (json \ "direction").asOpt[String].getOrElse("out")
     } yield {
-      val labelWithDir = LabelWithDirection(label.id.get, direction)
-      labelWithDirs += labelWithDir
-      val (src, tgt, dir) = if (direction == 1) {
-        isReverted = true
-        (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)),
-          Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)), 0)
-      } else {
-        (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)),
-          Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)), 0)
-      }
-      (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
+      Edge.toEdge(from, to, labelName, direction, Map.empty)
     }
-    (quads, isReverted)
   }
 
   def toGraphElements(str: String): Seq[GraphElement] = {
@@ -671,16 +703,6 @@ class RequestParser(graph: Graph) {
     (labels, direction, ids, ts, vertices)
   }
 
-  def toFetchAndDeleteParam(json: JsValue) = {
-    val labelName = (json \ "label").as[String]
-    val fromOpt = (json \ "from").asOpt[JsValue]
-    val toOpt = (json \ "to").asOpt[JsValue]
-    val direction = (json \ "direction").asOpt[String].getOrElse("out")
-    val indexOpt = (json \ "index").asOpt[String]
-    val propsOpt = (json \ "props").asOpt[JsObject]
-    (labelName, fromOpt, toOpt, direction, indexOpt, propsOpt)
-  }
-
   def parseExperiment(jsQuery: JsValue): Seq[ExperimentParam] = 
jsQuery.as[Seq[JsObject]].map { obj =>
     def _require(field: String) = throw new RuntimeException(s"${field} not 
found")
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 4c77ad6..099a7f9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -29,6 +29,8 @@ import org.apache.s2graph.core.utils.logger
 import play.api.libs.json._
 
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
+import scala.util.control.NonFatal
 
 object RestHandler {
   trait CanLookup[A] {
@@ -56,7 +58,7 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
 
   import RestHandler._
   val requestParser = new RequestParser(graph)
-
+  val querySampleRate: Double = graph.config.getDouble("query.log.sample.rate")
 
   /**
     * Public APIS
@@ -69,15 +71,8 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
       val jsQuery = Json.parse(body)
 
       uri match {
-//        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, 
impIdOpt)(PostProcess.toSimpleVertexArrJson))
-        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, 
impIdOpt)(PostProcess.toJson))
-//        case "/graphs/getEdges/grouped" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
-//        case "/graphs/getEdgesExcluded" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
-//        case "/graphs/getEdgesExcluded/grouped" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
+        case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery, 
impIdOpt)(PostProcess.toJson(Option(jsQuery))))
         case "/graphs/checkEdges" => checkEdges(jsQuery)
-//        case "/graphs/getEdgesGrouped" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
-//        case "/graphs/getEdgesGroupedExcluded" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
-//        case "/graphs/getEdgesGroupedExcludedFormatted" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
         case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
         case "/graphs/experiments" => experiments(jsQuery)
         case uri if uri.startsWith("/graphs/experiment") =>
@@ -93,17 +88,27 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
   // TODO: Refactor to doGet
   def checkEdges(jsValue: JsValue): HandlerResult = {
     try {
-      val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
-
-      HandlerResult(graph.checkEdges(quads).map { case stepResult =>
-        PostProcess.toJson(graph, QueryOption(), stepResult)
+      val edges = requestParser.toCheckEdgeParam(jsValue)
+
+      HandlerResult(graph.checkEdges(edges).map { case stepResult =>
+        val jsArray = for {
+          s2EdgeWithScore <- stepResult.edgeWithScores
+//          json <- PostProcess.s2EdgeToJsValue(QueryOption(), s2EdgeWithScore)
+          json = PostProcess.s2EdgeToJsValue(QueryOption(), s2EdgeWithScore)
+        } yield json
+        Json.toJson(jsArray)
       })
     } catch {
-      case e: Exception => HandlerResult(Future.failed(e))
+      case e: Exception =>
+        logger.error(s"RestHandler#checkEdges error: $e")
+        HandlerResult(Future.failed(e))
     }
   }
 
 
+  /**
+    * Private APIS
+    */
   private def experiments(jsQuery: JsValue): HandlerResult = {
     val params: Seq[RequestParser.ExperimentParam] = 
requestParser.parseExperiment(jsQuery)
 
@@ -120,7 +125,7 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
     HandlerResult(body = result)
   }
 
-  private def experiment(contentsBody: JsValue, accessToken: String, 
experimentName: String, uuid: String, impKeyOpt: => Option[String] = None): 
HandlerResult = {
+  def experiment(contentsBody: JsValue, accessToken: String, experimentName: 
String, uuid: String, impKeyOpt: => Option[String] = None): HandlerResult = {
     try {
       val bucketOpt = for {
         service <- Service.findByAccessToken(accessToken)
@@ -128,9 +133,21 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
         bucket <- experiment.findBucket(uuid, impKeyOpt)
       } yield bucket
 
-      val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is 
not found"))
+      val bucket = bucketOpt.getOrElse(throw new RuntimeException(s"bucket is 
not found. $accessToken, $experimentName, $uuid, $impKeyOpt"))
       if (bucket.isGraphQuery) {
         val ret = buildRequestInner(contentsBody, bucket, uuid)
+
+        logQuery(Json.obj(
+          "type" -> "experiment",
+          "time" -> System.currentTimeMillis(),
+          "body" -> contentsBody,
+          "uri" -> Seq("graphs", "experiment", accessToken, experimentName, 
uuid).mkString("/"),
+          "accessToken" -> accessToken,
+          "experimentName" -> experimentName,
+          "uuid" -> uuid,
+          "impressionId" -> bucket.impressionId
+        ))
+
         HandlerResult(ret.body, Experiment.ImpressionKey -> 
bucket.impressionId)
       }
       else throw new RuntimeException("not supported yet")
@@ -156,24 +173,39 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
 
   def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None)
                    (post: (Graph, QueryOption, StepResult) => JsValue): 
Future[JsValue] = {
-    jsonQuery match {
-      case obj@JsObject(_) =>
-        (obj \ "queries").asOpt[JsValue] match {
-          case None =>
-            val query = requestParser.toQuery(obj, impIdOpt)
-            graph.getEdges(query).map(post(graph, query.queryOption, _))
-          case _ =>
-            val multiQuery = requestParser.toMultiQuery(obj, impIdOpt)
-            graph.getEdgesMultiQuery(multiQuery).map(post(graph, 
multiQuery.queryOption, _))
-        }
-
-      case JsArray(arr) =>
-        val queries = arr.map(requestParser.toQuery(_, impIdOpt))
-        val weights = queries.map(_ => 1.0)
-        val multiQuery = MultiQuery(queries, weights, QueryOption(), jsonQuery)
-        graph.getEdgesMultiQuery(multiQuery).map(post(graph, 
multiQuery.queryOption, _))
-
-      case _ => throw BadQueryException("Cannot support")
+
+    def query(obj: JsValue): Future[JsValue] = {
+      (obj \ "queries").asOpt[JsValue] match {
+        case None =>
+          val s2Query = requestParser.toQuery(obj, impIdOpt)
+          graph.getEdges(s2Query).map(post(graph, s2Query.queryOption, _))
+        case _ =>
+          val multiQuery = requestParser.toMultiQuery(obj, impIdOpt)
+          graph.getEdgesMultiQuery(multiQuery).map(post(graph, 
multiQuery.queryOption, _))
+      }
+    }
+
+    logQuery(Json.obj(
+      "type" -> "getEdges",
+      "time" -> System.currentTimeMillis(),
+      "body" -> jsonQuery,
+      "uri" -> "graphs/getEdges"
+    ))
+
+    val unionQuery = (jsonQuery \ "union").asOpt[JsObject]
+    unionQuery match {
+      case None => jsonQuery match {
+        case obj@JsObject(_) => query(obj)
+        case JsArray(arr) =>
+          val res = arr.map(js => query(js.as[JsObject]))
+          Future.sequence(res).map(JsArray)
+        case _ => throw BadQueryException("Cannot support")
+      }
+
+      case Some(jsUnion) =>
+        val (keys, queries) = jsUnion.value.unzip
+        val futures = queries.map(query)
+        Future.sequence(futures).map(res => JsObject(keys.zip(res).toSeq))
     }
   }
 
@@ -194,7 +226,6 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
     graph.getVertices(vertices) map { vertices => 
PostProcess.verticesToJson(vertices) }
   }
 
-
   private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: 
Bucket, uuid: String): String = {
     var body = bucket.requestBody.replace("#uuid", uuid)
 
@@ -203,19 +234,24 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
       jsObj <- requestKeyJson.asOpt[JsObject]
       (key, value) <- jsObj.fieldSet
     } {
+      val escaped = Json.stringify(value)
       val replacement = value match {
-        case JsString(s) => s
-        case _ => value.toString
+        case _: JsString => escaped.slice(1, escaped.length - 1)
+        case _ => escaped
       }
+
       body = body.replace(key, replacement)
     }
 
     body
   }
 
-  def calcSize(js: JsValue): Int = js match {
-    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
-    case JsArray(seq) => seq.map(js => (js \ 
"size").asOpt[Int].getOrElse(0)).sum
-    case _ => 0
+  def calcSize(js: JsValue): Int =
+    (js \\ "size") map { sizeJs => sizeJs.asOpt[Int].getOrElse(0) } sum
+
+  def logQuery(queryJson: => JsObject): Unit = {
+    if (scala.util.Random.nextDouble() < querySampleRate) {
+      logger.query(queryJson.toString)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index c76c25c..403ceeb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -24,24 +24,29 @@ import org.hbase.async.KeyValue
 
 
 object SKeyValue {
+  val EdgeCf = "e".getBytes()
   val Put = 1
   val Delete = 2
   val Increment = 3
   val Default = Put
 }
+
 case class SKeyValue(table: Array[Byte],
                      row: Array[Byte],
                      cf: Array[Byte],
                      qualifier: Array[Byte],
                      value: Array[Byte],
                      timestamp: Long,
-                     operation: Int = SKeyValue.Default) {
+                     operation: Int = SKeyValue.Default,
+                     durability: Boolean = true) {
   def toLogString = {
-    Map("table" -> table.toList, "row" -> row.toList, "cf" -> 
Bytes.toString(cf),
+    Map("table" -> Bytes.toString(table), "row" -> row.toList, "cf" -> 
Bytes.toString(cf),
       "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> 
timestamp,
-      "operation" -> operation).mapValues(_.toString).toString
+      "operation" -> operation, "durability" -> durability).toString
   }
   override def toString(): String = toLogString
+
+  def toKeyValue: KeyValue = new KeyValue(row, cf, qualifier, timestamp, value)
 }
 
 trait CanSKeyValue[T] {
@@ -64,3 +69,4 @@ object CanSKeyValue {
 
   // For hbase KeyValues
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index a6e81b4..07e39aa 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -19,53 +19,44 @@
 
 package org.apache.s2graph.core.storage
 
-import java.util.concurrent.{Executors, TimeUnit}
 
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
+import org.apache.s2graph.core.GraphExceptions.{NoStackException, 
FetchTimeoutException}
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.parsers.WhereParser
 import 
org.apache.s2graph.core.storage.serde.indexedge.wide.{IndexEdgeDeserializable, 
IndexEdgeSerializable}
 import 
org.apache.s2graph.core.storage.serde.snapshotedge.wide.SnapshotEdgeDeserializable
 import org.apache.s2graph.core.storage.serde.vertex.{VertexDeserializable, 
VertexSerializable}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.{Extensions, logger}
+import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
 
-import scala.annotation.tailrec
-import scala.collection.Seq
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Random, Try}
+import java.util.concurrent.{Executors, TimeUnit}
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.hadoop.hbase.util.Bytes
+
 
-abstract class Storage[R](val graph: Graph,
+abstract class Storage[Q, R](val graph: Graph,
                           val config: Config)(implicit ec: ExecutionContext) {
   import HBaseType._
+  import Graph._
 
-  /** storage dependent configurations */
-  val DeleteAllFetchCount = config.getInt("delete.all.fetch.count")
-  val MaxRetryNum = config.getInt("max.retry.number")
-  val MaxBackOff = config.getInt("max.back.off")
-  val BackoffTimeout = config.getInt("back.off.timeout")
-  val DeleteAllFetchSize = config.getInt("delete.all.fetch.size")
-  val FailProb = config.getDouble("hbase.fail.prob")
-  val LockExpireDuration = config.getInt("lock.expire.time")
-  val MaxSize = config.getInt("future.cache.max.size")
-  val ExpireAfterWrite = config.getInt("future.cache.expire.after.write")
-  val ExpireAfterAccess = config.getInt("future.cache.expire.after.access")
+  val BackoffTimeout = graph.BackoffTimeout
+  val MaxRetryNum = graph.MaxRetryNum
+  val MaxBackOff = graph.MaxBackOff
+  val FailProb = graph.FailProb
+  val LockExpireDuration =  graph.LockExpireDuration
+  val MaxSize = graph.MaxSize
+  val ExpireAfterWrite = graph.ExpireAfterWrite
+  val ExpireAfterAccess = graph.ExpireAfterAccess
 
   /** retry scheduler */
   val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
 
 
-  /** handle mutate failed */
-  val exceptionHandler = new ExceptionHandler(config)
-  val failTopic = s"mutateFailed_${config.getString("phase")}"
-
-  /** fallback */
-  val fallback = Future.successful(StepResult.Empty)
-  val innerFallback = Future.successful(StepInnerResult.Empty)
-
   /**
    * Compatibility table
    * | label schema version | snapshot edge | index edge | vertex | note |
@@ -109,7 +100,7 @@ abstract class Storage[R](val graph: Graph,
    * @param vertex: vertex to serialize
    * @return serializer implementation
    */
-  def vertexSerializer(vertex: Vertex) = new VertexSerializable(vertex)
+  def vertexSerializer(vertex: Vertex): Serializable[Vertex] = new 
VertexSerializable(vertex)
 
   /**
    * create deserializer that can parse stored CanSKeyValue into snapshotEdge.
@@ -142,7 +133,7 @@ abstract class Storage[R](val graph: Graph,
     indexEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parser stored CanSKeyValue into vertex. */
-  val vertexDeserializer = new VertexDeserializable
+  val vertexDeserializer: Deserializable[Vertex] = new VertexDeserializable
 
 
   /**
@@ -170,7 +161,7 @@ abstract class Storage[R](val graph: Graph,
    * @param request
    * @return
    */
-  def fetchSnapshotEdgeKeyValues(request: AnyRef): Future[Seq[SKeyValue]]
+  def fetchSnapshotEdgeKeyValues(request: QueryRequest): Future[Seq[SKeyValue]]
 
   /**
    * write requestKeyValue into storage if the current value in storage that 
is stored matches.
@@ -200,10 +191,11 @@ abstract class Storage[R](val graph: Graph,
    * build proper request which is specific into storage to call 
fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
    * for example, Asynchbase use GetRequest, Scanner so this method is 
responsible to build
    * client request(GetRequest, Scanner) based on user provided query.
-   * @param queryRequest
+    *
+    * @param queryRequest
    * @return
-   */
-  def buildRequest(queryRequest: QueryRequest): AnyRef
+    */
+  protected def buildRequest(queryRequest: QueryRequest, edge: Edge): Q
 
   /**
    * fetch IndexEdges for given queryParam in queryRequest.
@@ -215,53 +207,55 @@ abstract class Storage[R](val graph: Graph,
    * so single I/O return type should be Deferred[T].
    *
    * if we use native hbase client, then this return type can be Future[T] or 
just T.
-   * @param queryRequest
-   * @param prevStepScore
+    *
+    * @param queryRequest
    * @param isInnerCall
    * @param parentEdges
    * @return
    */
   def fetch(queryRequest: QueryRequest,
-            prevStepScore: Double,
             isInnerCall: Boolean,
             parentEdges: Seq[EdgeWithScore]): R
 
   /**
    * responsible to fire parallel fetch call into storage and create future 
that will return merged result.
-   * @param queryRequestWithScoreLs
+   *
+   * @param queryRequests
    * @param prevStepEdges
    * @return
    */
-  def fetches(queryRequestWithScoreLs: Seq[(QueryRequest, Double)],
-              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[StepInnerResult]]
+  def fetches(queryRequests: Seq[QueryRequest],
+              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): 
Future[Seq[StepResult]]
 
   /**
    * fetch Vertex for given request from storage.
-   * @param request
+    *
+    * @param request
    * @return
    */
-  def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]]
+  def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]]
 
   /**
    * decide how to apply given edges(indexProps values + Map(_count -> 
countVal)) into storage.
-   * @param edges
+    *
+    * @param edges
    * @param withWait
    * @return
    */
-  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long)]]
+  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long, Long)]]
 
   /**
    * this method need to be called when client shutdown. this is responsible 
to cleanUp the resources
    * such as client into storage.
    */
   def flush(): Unit = {
-    exceptionHandler.shutdown()
   }
 
   /**
    * create table on storage.
    * if storage implementation does not support namespace or table, then there 
is nothing to be done
-   * @param zkAddr
+    *
+    * @param zkAddr
    * @param tableName
    * @param cfs
    * @param regionMultiplier
@@ -273,28 +267,29 @@ abstract class Storage[R](val graph: Graph,
                   cfs: List[String],
                   regionMultiplier: Int,
                   ttl: Option[Int],
-                  compressionAlgorithm: String): Unit
+                  compressionAlgorithm: String,
+                  replicationScopeOpt: Option[Int] = None,
+                  totalRegionCount: Option[Int] = None): Unit
 
 
 
 
 
   /** Public Interface */
-
   def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
-    def fromResult(queryParam: QueryParam,
-                   kvs: Seq[SKeyValue],
+    def fromResult(kvs: Seq[SKeyValue],
                    version: String): Option[Vertex] = {
       if (kvs.isEmpty) None
-      else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
+      else vertexDeserializer.fromKeyValues(None, kvs, version, None)
+//        .map(S2Vertex(graph, _))
     }
 
     val futures = vertices.map { vertex =>
       val queryParam = QueryParam.Empty
       val q = Query.toQuery(Seq(vertex), queryParam)
       val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-      fetchVertexKeyValues(buildRequest(queryRequest)).map { kvs =>
-        fromResult(queryParam, kvs, vertex.serviceColumn.schemaVersion)
+      fetchVertexKeyValues(queryRequest).map { kvs =>
+        fromResult(kvs, vertex.serviceColumn.schemaVersion)
       } recoverWith { case ex: Throwable =>
         Future.successful(None)
       }
@@ -302,92 +297,39 @@ abstract class Storage[R](val graph: Graph,
 
     Future.sequence(futures).map { result => result.toList.flatten }
   }
-
-  def mutateElements(elements: Seq[GraphElement],
-                     withWait: Boolean = false): Future[Seq[Boolean]] = {
-
-    val edgeBuffer = ArrayBuffer[Edge]()
-    val vertexBuffer = ArrayBuffer[Vertex]()
-
-    elements.foreach {
-      case e: Edge => edgeBuffer += e
-      case v: Vertex => vertexBuffer += v
-      case any@_ => logger.error(s"Unknown type: ${any}")
-    }
-
-    val edgeFuture = mutateEdges(edgeBuffer, withWait)
-    val vertexFuture = mutateVertices(vertexBuffer, withWait)
-
-    val graphFuture = for {
-      edgesMutated <- edgeFuture
-      verticesMutated <- vertexFuture
-    } yield edgesMutated ++ verticesMutated
-
-    graphFuture
-  }
-
-  def mutateEdges(edges: Seq[Edge], withWait: Boolean): Future[Seq[Boolean]] = 
{
-    val (strongEdges, weakEdges) =
-      (edges.partition(e => e.label.consistencyLevel == "strong" && e.op != 
GraphUtil.operations("insertBulk")))
-
-    val weakEdgesFutures = weakEdges.groupBy { e => e.label.hbaseZkAddr }.map 
{ case (zkQuorum, edges) =>
-      val mutations = edges.flatMap { edge =>
-        val (_, edgeUpdate) =
-          if (edge.op == GraphUtil.operations("delete")) 
Edge.buildDeleteBulk(None, edge)
-          else Edge.buildOperation(None, Seq(edge))
-
-        buildVertexPutsAsync(edge) ++ indexedEdgeMutations(edgeUpdate) ++
-          snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
-      }
-      writeToStorage(zkQuorum, mutations, withWait)
-    }
-    val strongEdgesFutures = mutateStrongEdges(strongEdges, withWait)
-    for {
-      weak <- Future.sequence(weakEdgesFutures)
-      strong <- strongEdgesFutures
-    } yield {
-      strong ++ weak
-    }
-  }
-
   def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
 
-    val grouped = _edges.groupBy { edge => (edge.label, 
edge.srcVertex.innerId, edge.tgtVertex.innerId) } toSeq
+    val edgeWithIdxs = _edges.zipWithIndex
+    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+      (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+    } toSeq
 
     val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
-      val (deleteAllEdges, edges) = edgeGroup.partition(_.op == 
GraphUtil.operations("deleteAll"))
-
-      // DeleteAll first
-      val deleteAllFutures = deleteAllEdges.map { edge =>
-        deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), 
edge.labelWithDir.dir, edge.ts)
-      }
-
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
       // After deleteAll, process others
-      lazy val mutateEdgeFutures = edges.toList match {
+      val mutateEdgeFutures = edges.toList match {
         case head :: tail =>
-          //          val strongConsistency = 
edges.head.label.consistencyLevel == "strong"
-          //          if (strongConsistency) {
           val edgeFuture = mutateEdgesInner(edges, checkConsistency = true , 
withWait)
 
           //TODO: decide what we will do on failure on vertex put
           val puts = buildVertexPutsAsync(head)
           val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, 
withWait)
           Seq(edgeFuture, vertexFuture)
-        //          } else {
-        //            edges.map { edge => mutateEdge(edge, withWait = 
withWait) }
-        //          }
         case Nil => Nil
       }
 
       val composed = for {
-        deleteRet <- Future.sequence(deleteAllFutures)
+//        deleteRet <- Future.sequence(deleteAllFutures)
         mutateRet <- Future.sequence(mutateEdgeFutures)
-      } yield deleteRet ++ mutateRet
+      } yield mutateRet
 
-      composed.map(_.forall(identity))
+      composed.map(_.forall(identity)).map { ret => idxs.map(idx => idx -> 
ret) }
     }
 
-    Future.sequence(mutateEdges)
+    Future.sequence(mutateEdges).map { squashedRets =>
+      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+    }
   }
 
   def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = {
@@ -413,16 +355,22 @@ abstract class Storage[R](val graph: Graph,
                        checkConsistency: Boolean,
                        withWait: Boolean): Future[Boolean] = {
     assert(edges.nonEmpty)
+    // TODO:: remove after code review: unreachable code
     if (!checkConsistency) {
+
       val zkQuorum = edges.head.label.hbaseZkAddr
       val futures = edges.map { edge =>
         val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge))
-        val mutations = indexedEdgeMutations(edgeUpdate) ++ 
snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
+
+        val mutations =
+          indexedEdgeMutations(edgeUpdate) ++ 
snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
+
+
         writeToStorage(zkQuorum, mutations, withWait)
       }
       Future.sequence(futures).map { rets => rets.forall(identity) }
     } else {
-      fetchSnapshotEdge(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+      fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
         retry(1)(edges, 0, snapshotEdgeOpt)
       }
     }
@@ -438,9 +386,6 @@ abstract class Storage[R](val graph: Graph,
     if (tryNum >= MaxRetryNum) {
       edges.foreach { edge =>
         logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
-
-        val kafkaMessage = ExceptionHandler.toKafkaMessage(failTopic, element 
= edge)
-        exceptionHandler.enqueue(kafkaMessage)
       }
 
       Future.successful(false)
@@ -454,11 +399,10 @@ abstract class Storage[R](val graph: Graph,
         case FetchTimeoutException(retryEdge) =>
           logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
           /** fetch failed. re-fetch should be done */
-          fetchSnapshotEdge(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+          fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
             retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
           }
 
-
         case PartialFailureException(retryEdge, failedStatusCode, faileReason) 
=>
           val status = failedStatusCode match {
             case 0 => "AcquireLock failed."
@@ -477,7 +421,7 @@ abstract class Storage[R](val graph: Graph,
               val future = if (failedStatusCode == 0) {
                 // acquire Lock failed. other is mutating so this thead need 
to re-fetch snapshotEdge.
                 /** fetch failed. re-fetch should be done */
-                fetchSnapshotEdge(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
+                fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, 
snapshotEdgeOpt, kvOpt) =>
                   retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
                 }
               } else {
@@ -620,7 +564,8 @@ abstract class Storage[R](val graph: Graph,
   /**
    * orchestrate commit process.
    * we separate into 4 step to avoid duplicating each step over and over.
-   * @param statusCode: current statusCode of this thread to process edges.
+    *
+    * @param statusCode: current statusCode of this thread to process edges.
    * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge.
    * @param fetchedSnapshotEdgeOpt: fetched snapshotEdge from storage before 
commit process begin.
    * @param lockSnapshotEdge: lockEdge that hold necessary data to lock this 
snapshotEdge for this thread.
@@ -643,7 +588,7 @@ abstract class Storage[R](val graph: Graph,
     } yield lockReleased
   }
 
-  case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: 
String) extends Exception
+  case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: 
String) extends NoStackException(failReason)
 
   protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) 
= {
     val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
@@ -658,7 +603,8 @@ abstract class Storage[R](val graph: Graph,
 
   /**
    * try to acquire lock on storage for this given snapshotEdge(lockEdge).
-   * @param statusCode: current statusCode of this thread to process edges.
+    *
+    * @param statusCode: current statusCode of this thread to process edges.
    * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
    * @param fetchedSnapshotEdgeOpt: fetched snapshot edge from storage.
    * @param lockEdge: lockEdge to build RPC request(compareAndSet) into 
Storage.
@@ -708,7 +654,8 @@ abstract class Storage[R](val graph: Graph,
    * change this snapshot's state on storage from locked into committed by
    * storing new merged states on storage. merge state come from 
releaseLockEdge.
    * note that releaseLock return Future.failed on predicate failure.
-   * @param predicate: indicate if this releaseLock phase should be proceed or 
not.
+    *
+    * @param predicate: indicate if this releaseLock phase should be proceed 
or not.
    * @param statusCode: releaseLock do not use statusCode, only for debug.
    * @param squashedEdge: squashed(in memory) final edge from input edges on 
same snapshotEdge. only for debug
    * @param releaseLockEdge: final merged states if all process goes well.
@@ -797,6 +744,18 @@ abstract class Storage[R](val graph: Graph,
                           statusCode: Byte,
                           squashedEdge: Edge,
                           edgeMutate: EdgeMutate): Future[Boolean] = {
+
+    def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
+      writeToStorage(squashedEdge.label.hbaseZkAddr, kvs, withWait = 
withWait).map { ret =>
+        if (ret) {
+          debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate)
+        } else {
+          throw new PartialFailureException(squashedEdge, 2, "hbase fail.")
+        }
+        true
+      }
+    }
+
     if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 2, 
"predicate failed."))
     if (statusCode >= 3) {
       logger.debug(s"skip increment: 
[$statusCode]\n${squashedEdge.toLogString}")
@@ -804,21 +763,13 @@ abstract class Storage[R](val graph: Graph,
     } else {
       val p = Random.nextDouble()
       if (p < FailProb) Future.failed(new 
PartialFailureException(squashedEdge, 2, s"$p"))
-      else
-        writeToStorage(squashedEdge.label.hbaseZkAddr, increments(edgeMutate), 
withWait = true).map { ret =>
-          if (ret) {
-            debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate)
-          } else {
-            throw new PartialFailureException(squashedEdge, 2, "hbase fail.")
-          }
-          true
-        }
+      else {
+        val incrs = increments(edgeMutate)
+        _write(incrs, true)
+      }
     }
   }
 
-
-
-
   /** end of methods for consistency */
 
   def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge],
@@ -832,17 +783,18 @@ abstract class Storage[R](val graph: Graph,
 
 
   /** Delete All */
-  protected def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepInnerResult,
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
                                               requestTs: Long,
                                               retryNum: Int): Future[Boolean] 
= {
     if (stepInnerResult.isEmpty) Future.successful(true)
     else {
-      val head = stepInnerResult.edgesWithScoreLs.head
+      val head = stepInnerResult.edgeWithScores.head
       val zkQuorum = head.edge.label.hbaseZkAddr
       val futures = for {
-        edgeWithScore <- stepInnerResult.edgesWithScoreLs
-        (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
+        edgeWithScore <- stepInnerResult.edgeWithScores
       } yield {
+          val edge = edgeWithScore.edge
+          val score = edgeWithScore.score
           /** reverted direction */
           val reversedIndexedEdgesMutations = 
edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
             indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
@@ -861,141 +813,6 @@ abstract class Storage[R](val graph: Graph,
     }
   }
 
-  protected def buildEdgesToDelete(stepInnerResult: StepInnerResult, 
requestTs: Long): StepInnerResult = {
-    val filtered = stepInnerResult.edgesWithScoreLs.filter { edgeWithScore =>
-      (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
-    }
-    if (filtered.isEmpty) StepInnerResult.Empty
-    else {
-      val head = filtered.head
-      val label = head.edge.label
-      val edgeWithScoreLs = filtered.map { edgeWithScore =>
-        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match 
{
-          case "strong" =>
-            val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
-              Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
-            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-          case _ =>
-            val oldEdge = edgeWithScore.edge
-            (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
-        }
-
-        val copiedEdge =
-          edgeWithScore.edge.copy(op = newOp, version = newVersion, 
propsWithTs = newPropsWithTs)
-
-        val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
-        //      logger.debug(s"delete edge from deleteAll: 
${edgeToDelete.edge.toLogString}")
-        edgeToDelete
-      }
-      //Degree edge?
-      StepInnerResult(edgeWithScoreLs, Nil, false)
-    }
-  }
-
-  protected def deleteAllFetchedEdgesLs(stepInnerResultLs: 
Seq[StepInnerResult],
-                                        requestTs: Long): Future[(Boolean, 
Boolean)] = {
-    stepInnerResultLs.foreach { stepInnerResult =>
-      if (stepInnerResult.isFailure) throw new RuntimeException("fetched 
result is fallback.")
-    }
-    val futures = for {
-      stepInnerResult <- stepInnerResultLs
-      deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
-      if deleteStepInnerResult.edgesWithScoreLs.nonEmpty
-    } yield {
-        val head = deleteStepInnerResult.edgesWithScoreLs.head
-        val label = head.edge.label
-        label.schemaVersion match {
-          case HBaseType.VERSION3 | HBaseType.VERSION4 =>
-            if (label.consistencyLevel == "strong") {
-              /**
-               * read: snapshotEdge on queryResult = O(N)
-               * write: N x (relatedEdges x indices(indexedEdge) + 
1(snapshotEdge))
-               */
-              mutateEdges(deleteStepInnerResult.edgesWithScoreLs.map(_.edge), 
withWait = true).map(_.forall(identity))
-            } else {
-              deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, 
MaxRetryNum)
-            }
-          case _ =>
-
-            /**
-             * read: x
-             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x 
indices)
-             */
-            deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, 
MaxRetryNum)
-        }
-      }
-
-    if (futures.isEmpty) {
-      // all deleted.
-      Future.successful(true -> true)
-    } else {
-      Future.sequence(futures).map { rets => false -> rets.forall(identity) }
-    }
-  }
-
-  protected def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): 
Future[(Boolean, Boolean)] = {
-    val future = for {
-      stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_)))
-      (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, 
requestTs)
-    } yield {
-//        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
-        (allDeleted, ret)
-      }
-
-    Extensions.retryOnFailure(MaxRetryNum) {
-      future
-    } {
-      logger.error(s"fetch and deleteAll failed.")
-      (true, false)
-    }
-
-  }
-
-  def deleteAllAdjacentEdges(srcVertices: Seq[Vertex],
-                             labels: Seq[Label],
-                             dir: Int,
-                             ts: Long): Future[Boolean] = {
-
-    def enqueueLogMessage() = {
-      val kafkaMessages = for {
-        vertice <- srcVertices
-        id = vertice.innerId.toIdString()
-        label <- labels
-      } yield {
-          val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", 
GraphUtil.fromOp(dir.toByte)).mkString("\t")
-          ExceptionHandler.toKafkaMessage(failTopic, tsv)
-        }
-
-      kafkaMessages.foreach(exceptionHandler.enqueue)
-    }
-
-    val requestTs = ts
-    /** create query per label */
-    val queries = for {
-      label <- labels
-    } yield {
-        val labelWithDir = LabelWithDirection(label.id.get, dir)
-        val queryParam = QueryParam(labelWithDir).limit(0, 
DeleteAllFetchSize).duplicatePolicy(Option(Query.DuplicatePolicy.Raw))
-        val step = Step(List(queryParam))
-        Query(srcVertices, Vector(step))
-      }
-
-    //    Extensions.retryOnSuccessWithBackoff(MaxRetryNum, 
Random.nextInt(MaxBackOff) + 1) {
-        val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
-      fetchAndDeleteAll(queries, requestTs)
-    } { case (allDeleted, deleteSuccess) =>
-      allDeleted && deleteSuccess
-    }.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
-
-    retryFuture onFailure {
-      case ex =>
-        logger.error(s"[Error]: deleteAllAdjacentEdges failed.")
-        enqueueLogMessage()
-    }
-
-    retryFuture
-  }
-
   /** End Of Delete All */
 
 
@@ -1003,29 +820,34 @@ abstract class Storage[R](val graph: Graph,
 
   /** Parsing Logic: parse from kv from Storage into Edge */
   def toEdge[K: CanSKeyValue](kv: K,
-                              queryParam: QueryParam,
+                              queryRequest: QueryRequest,
                               cacheElementOpt: Option[IndexEdge],
                               parentEdges: Seq[EdgeWithScore]): Option[Edge] = 
{
-//        logger.debug(s"toEdge: $kv")
+    logger.debug(s"toEdge: $kv")
+
     try {
+      val queryOption = queryRequest.query.queryOption
+      val queryParam = queryRequest.queryParam
       val schemaVer = queryParam.label.schemaVersion
-      val indexEdgeOpt = 
indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), 
queryParam.label.schemaVersion, cacheElementOpt)
-      indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = 
parentEdges))
+      val indexEdgeOpt = 
indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), 
Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+      if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => 
indexEdge.toEdge.copy(parentEdges = parentEdges))
+      else indexEdgeOpt.map(indexEdge => indexEdge.toEdge)
     } catch {
       case ex: Exception =>
-        logger.error(s"Fail on toEdge: ${kv.toString}, ${queryParam}", ex)
+        logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex)
         None
     }
   }
 
   def toSnapshotEdge[K: CanSKeyValue](kv: K,
-                                      queryParam: QueryParam,
+                                      queryRequest: QueryRequest,
                                       cacheElementOpt: Option[SnapshotEdge] = 
None,
                                       isInnerCall: Boolean,
                                       parentEdges: Seq[EdgeWithScore]): 
Option[Edge] = {
 //        logger.debug(s"SnapshottoEdge: $kv")
+    val queryParam = queryRequest.queryParam
     val schemaVer = queryParam.label.schemaVersion
-    val snapshotEdgeOpt = 
snapshotEdgeDeserializer(schemaVer).fromKeyValues(queryParam, Seq(kv), 
queryParam.label.schemaVersion, cacheElementOpt)
+    val snapshotEdgeOpt = 
snapshotEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), 
Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
 
     if (isInnerCall) {
       snapshotEdgeOpt.flatMap { snapshotEdge =>
@@ -1045,87 +867,137 @@ abstract class Storage[R](val graph: Graph,
     }
   }
 
+  val dummyCursor: Array[Byte] = Array.empty
+
   def toEdges[K: CanSKeyValue](kvs: Seq[K],
-                               queryParam: QueryParam,
+                               queryRequest: QueryRequest,
                                prevScore: Double = 1.0,
                                isInnerCall: Boolean,
                                parentEdges: Seq[EdgeWithScore],
                                startOffset: Int = 0,
-                               len: Int = Int.MaxValue): Seq[EdgeWithScore] = {
-    if (kvs.isEmpty) Seq.empty
+                               len: Int = Int.MaxValue): StepResult = {
+
+    val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
+
+    if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
     else {
+      val queryOption = queryRequest.query.queryOption
+      val queryParam = queryRequest.queryParam
+      val labelWeight = queryRequest.labelWeight
+      val nextStepOpt = queryRequest.nextStepOpt
+      val where = queryParam.where.get
+      val label = queryParam.label
+      val isDefaultTransformer = queryParam.edgeTransformer.isDefault
       val first = kvs.head
       val kv = first
       val schemaVer = queryParam.label.schemaVersion
       val cacheElementOpt =
         if (queryParam.isSnapshotEdge) None
-        else indexEdgeDeserializer(schemaVer).fromKeyValues(queryParam, 
Seq(kv), queryParam.label.schemaVersion, None)
+        else 
indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), 
Seq(kv), queryParam.label.schemaVersion, None)
+
+      val (degreeEdges, keyValues) = cacheElementOpt match {
+        case None => (Nil, kvs)
+        case Some(cacheElement) =>
+          val head = cacheElement.toEdge
+          if (!head.isDegree) (Nil, kvs)
+          else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail)
+      }
 
-      for {
-        (kv, idx) <- kvs.zipWithIndex if idx >= startOffset && idx < 
startOffset + len
-        edge <-
-        if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryParam, None, 
isInnerCall, parentEdges)
-        else toEdge(kv, queryParam, cacheElementOpt, parentEdges)
-      } yield {
-        //TODO: Refactor this.
-        val currentScore =
-          queryParam.scorePropagateOp match {
-            case "plus" => edge.rank(queryParam.rank) + prevScore
-            case "divide" =>
-              if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
-              else edge.rank(queryParam.rank) / (prevScore + 
queryParam.scorePropagateShrinkage)
-            case _ => edge.rank(queryParam.rank) * prevScore
+      val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) 
toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
+
+      if (!queryOption.ignorePrevStepCache) {
+        val edgeWithScores = for {
+          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < 
startOffset + len
+          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, 
queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, 
cacheElementOpt, parentEdges)).toSeq
+          if where == WhereParser.success || where.filter(edge)
+          convertedEdge <- if (isDefaultTransformer) Seq(edge) else 
convertEdges(queryParam, edge, nextStepOpt)
+        } yield {
+            val score = edge.rank(queryParam.rank)
+            EdgeWithScore(convertedEdge, score, label)
+          }
+        StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges 
= degreeEdges, cursors = lastCursor)
+      } else {
+        val degreeScore = 0.0
+
+        val edgeWithScores = for {
+          (kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < 
startOffset + len
+          edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, 
queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, 
cacheElementOpt, parentEdges)).toSeq
+          if where == WhereParser.success || where.filter(edge)
+          convertedEdge <- if (isDefaultTransformer) Seq(edge) else 
convertEdges(queryParam, edge, nextStepOpt)
+        } yield {
+            val edgeScore = edge.rank(queryParam.rank)
+            val score = queryParam.scorePropagateOp match {
+              case "plus" => edgeScore + prevScore
+              case "divide" =>
+                if ((prevScore + queryParam.scorePropagateShrinkage) == 0) 0
+                else edgeScore / (prevScore + 
queryParam.scorePropagateShrinkage)
+              case _ => edgeScore * prevScore
+            }
+            val tsVal = processTimeDecay(queryParam, edge)
+            val newScore = degreeScore + score
+            EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score 
= newScore * labelWeight * tsVal, label = label)
           }
-        EdgeWithScore(edge, currentScore)
+
+        val sampled =
+          if (queryRequest.queryParam.sample >= 0) sample(queryRequest, 
edgeWithScores, queryRequest.queryParam.sample)
+          else edgeWithScores
+
+        val normalized = if (queryParam.shouldNormalize) normalize(sampled) 
else sampled
+
+        StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = 
degreeEdges, cursors = lastCursor)
       }
     }
   }
 
   /** End Of Parse Logic */
 
-  protected def toRequestEdge(queryRequest: QueryRequest): Edge = {
+  protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: 
Seq[EdgeWithScore]): Edge = {
     val srcVertex = queryRequest.vertex
-    //    val tgtVertexOpt = queryRequest.tgtVertexOpt
-    val edgeCf = Serializable.edgeCf
     val queryParam = queryRequest.queryParam
     val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
     val label = queryParam.label
     val labelWithDir = queryParam.labelWithDir
     val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
-    val (srcInnerId, tgtInnerId) = tgtVertexIdOpt match {
+    val propsWithTs = label.EmptyPropsWithTs
+
+    tgtVertexIdOpt match {
       case Some(tgtVertexId) => // _to is given.
         /** we use toSnapshotEdge so dont need to swap src, tgt */
         val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
         val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, 
label.schemaVersion)
-        (src, tgt)
+        val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, src), 
TargetVertexId(tgtColumn.id.get, tgt))
+        val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
+
+        Edge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
       case None =>
         val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-        (src, src)
-    }
+        val srcVId = SourceVertexId(srcColumn.id.get, src)
+        val srcV = Vertex(srcVId)
 
-    val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, srcInnerId), 
TargetVertexId(tgtColumn.id.get, tgtInnerId))
-    val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
-    val currentTs = System.currentTimeMillis()
-    val propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), 
currentTs)).toMap
-    Edge(srcV, tgtV, labelWithDir, propsWithTs = propsWithTs)
+        Edge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, 
parentEdges = parentEdges)
+    }
   }
 
+  protected def fetchSnapshotEdgeInner(edge: Edge): Future[(QueryParam, 
Option[Edge], Option[SKeyValue])] = {
+    /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
+      * so use empty cacheKey.
+      * */
+    val queryParam = QueryParam(labelName = edge.label.label,
+      direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
+      tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
+      cacheTTLInMillis = -1)
+    val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
+    val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
+    //    val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
 
 
-  protected def fetchSnapshotEdge(edge: Edge): Future[(QueryParam, 
Option[Edge], Option[SKeyValue])] = {
-    val labelWithDir = edge.labelWithDir
-    val queryParam = QueryParam(labelWithDir)
-    val _queryParam = 
queryParam.tgtVertexInnerIdOpt(Option(edge.tgtVertex.innerId))
-    val q = Query.toQuery(Seq(edge.srcVertex), _queryParam)
-    val queryRequest = QueryRequest(q, 0, edge.srcVertex, _queryParam)
-
-    fetchSnapshotEdgeKeyValues(buildRequest(queryRequest)).map { kvs =>
+    fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
       val (edgeOpt, kvOpt) =
         if (kvs.isEmpty) (None, None)
         else {
-          val _edgeOpt = toEdges(kvs, queryParam, 1.0, isInnerCall = true, 
parentEdges = Nil).headOption.map(_.edge)
+          val snapshotEdgeOpt = toSnapshotEdge(kvs.head, queryRequest, 
isInnerCall = true, parentEdges = Nil)
           val _kvOpt = kvs.headOption
-          (_edgeOpt, _kvOpt)
+          (snapshotEdgeOpt, _kvOpt)
         }
       (queryParam, edgeOpt, kvOpt)
     } recoverWith { case ex: Throwable =>
@@ -1134,182 +1006,7 @@ abstract class Storage[R](val graph: Graph,
     }
   }
 
-  protected def fetchStep(orgQuery: Query,
-                          stepIdx: Int,
-                          stepInnerResult: StepInnerResult): 
Future[StepInnerResult] = {
-    if (stepInnerResult.isEmpty) Future.successful(StepInnerResult.Empty)
-    else {
-      val edgeWithScoreLs = stepInnerResult.edgesWithScoreLs
-
-      val q = orgQuery
-
-      val prevStepOpt = if (stepIdx > 0) Option(q.steps(stepIdx - 1)) else None
-      val prevStepThreshold = 
prevStepOpt.map(_.nextStepScoreThreshold).getOrElse(QueryParam.DefaultThreshold)
-      val prevStepLimit = prevStepOpt.map(_.nextStepLimit).getOrElse(-1)
-      val step = q.steps(stepIdx)
-
-      val alreadyVisited =
-        if (stepIdx == 0) Map.empty[(LabelWithDirection, Vertex), Boolean]
-        else Graph.alreadyVisitedVertices(stepInnerResult.edgesWithScoreLs)
-
-      val groupedBy = edgeWithScoreLs.map { case edgeWithScore =>
-        edgeWithScore.edge.tgtVertex -> edgeWithScore
-      }.groupBy { case (vertex, edgeWithScore) => vertex }
-
-      val groupedByFiltered = for {
-        (vertex, edgesWithScore) <- groupedBy
-        aggregatedScore = edgesWithScore.map(_._2.score).sum if 
aggregatedScore >= prevStepThreshold
-      } yield vertex -> aggregatedScore
-
-      val prevStepTgtVertexIdEdges = for {
-        (vertex, edgesWithScore) <- groupedBy
-      } yield vertex.id -> edgesWithScore.map { case (vertex, edgeWithScore) 
=> edgeWithScore }
-
-      val nextStepSrcVertices = if (prevStepLimit >= 0) {
-        groupedByFiltered.toSeq.sortBy(-1 * _._2).take(prevStepLimit)
-      } else {
-        groupedByFiltered.toSeq
-      }
-
-      val queryRequests = for {
-        (vertex, prevStepScore) <- nextStepSrcVertices
-        queryParam <- step.queryParams
-      } yield (QueryRequest(q, stepIdx, vertex, queryParam), prevStepScore)
-
-      val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
-      Graph.filterEdges(orgQuery, stepIdx, queryRequests.map(_._1), fetchedLs, 
orgQuery.steps(stepIdx).queryParams, alreadyVisited)(ec)
-    }
-  }
-  private def getEdgesStepInner(q: Query): Future[StepInnerResult] = {
-    Try {
-      if (q.steps.isEmpty) innerFallback
-      else {
-        // current stepIdx = -1
-        val startStepInnerResult = QueryResult.fromVertices(q)
-        q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) 
{ case (prevStepInnerResultFuture, (step, stepIdx)) =>
-          for {
-            prevStepInnerResult <- prevStepInnerResultFuture
-            currentStepInnerResult <- fetchStep(q, stepIdx, 
prevStepInnerResult)
-          } yield currentStepInnerResult
-        }
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        innerFallback
-    } get
-  }
-  def getEdges(q: Query): Future[StepResult] = {
-    Try {
-      if (q.steps.isEmpty) {
-        // TODO: this should be get vertex query.
-        fallback
-      } else {
-        val filterOutFuture = q.queryOption.filterOutQuery match {
-          case None => innerFallback
-          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
-        }
-        for {
-          innerResult <- getEdgesStepInner(q)
-          filterOutInnerResult <- filterOutFuture
-        } yield {
-          val result = StepResult(graph, q.queryOption, innerResult)
-          if (filterOutInnerResult.isEmpty) result
-          else {
-            StepResult.filterOut(graph, q.queryOption, result, 
filterOutInnerResult)
-          }
-        }
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        fallback
-    } get
-  }
 
-  def getEdgesMultiQuery(mq: MultiQuery): Future[StepResult] = {
-    val fallback = Future.successful(StepResult.Empty)
-
-    Try {
-      if (mq.queries.isEmpty) fallback
-      else {
-        val filterOutFuture = mq.queryOption.filterOutQuery match {
-          case None => innerFallback
-          case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
-        }
-
-        val multiQueryFutures = Future.sequence(mq.queries.map { query => 
getEdges(query) })
-        for {
-          multiQueryResults <- multiQueryFutures
-          filterOutInnerResult <- filterOutFuture
-        } yield {
-          val merged = StepResult.merges(mq.queryOption, multiQueryResults, 
mq.weights)
-          StepResult.filterOut(graph, mq.queryOption, merged, 
filterOutInnerResult)
-        }
-      }
-    } recover {
-      case e: Exception =>
-        logger.error(s"getEdgesAsync: $e", e)
-        fallback
-    } get
-  }
-
-  def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): 
Future[StepResult] = {
-    val ts = System.currentTimeMillis()
-    val futures = for {
-      (srcVertex, tgtVertex, queryParam) <- params
-      propsWithTs = Map(LabelMeta.timeStampSeq -> 
InnerValLikeWithTs.withLong(ts, ts, queryParam.label.schemaVersion))
-      edge = Edge(srcVertex, tgtVertex, queryParam.labelWithDir, propsWithTs = 
propsWithTs)
-    } yield {
-        fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
-          edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0))
-        }
-      }
-
-    Future.sequence(futures).map { edgeWithScoreLs =>
-      val s2EdgeWithScoreLs = edgeWithScoreLs.flatMap { ls =>
-        ls.map { edgeWithScore =>
-          S2EdgeWithScore(edgeWithScore.edge, edgeWithScore.score)
-        }
-      }
-      StepResult(results = s2EdgeWithScoreLs, grouped = Nil, degreeEdges = Nil)
-    }
-  }
-
-
-
-  @tailrec
-  final def randomInt(sampleNumber: Int, range: Int, set: Set[Int] = 
Set.empty[Int]): Set[Int] = {
-    if (range < sampleNumber || set.size == sampleNumber) set
-    else randomInt(sampleNumber, range, set + Random.nextInt(range))
-  }
-
-  protected def sample(queryRequest: QueryRequest, edges: Seq[EdgeWithScore], 
n: Int): Seq[EdgeWithScore] = {
-    if (edges.size <= n){
-      edges
-    }else{
-      val plainEdges = if (queryRequest.queryParam.offset == 0) {
-        edges.tail
-      } else edges
-
-      val randoms = randomInt(n, plainEdges.size)
-      var samples = List.empty[EdgeWithScore]
-      var idx = 0
-      plainEdges.foreach { e =>
-        if (randoms.contains(idx)) samples = e :: samples
-        idx += 1
-      }
-      samples.toSeq
-    }
-
-  }
-
-  protected def normalize(edgeWithScores: Seq[EdgeWithScore]): 
Seq[EdgeWithScore] = {
-    val sum = edgeWithScores.foldLeft(0.0) { case (acc, cur) => acc + 
cur.score }
-    edgeWithScores.map { edgeWithScore =>
-      edgeWithScore.copy(score = edgeWithScore.score / sum)
-    }
-  }
   /** end of query */
 
   /** Mutation Builder */
@@ -1317,51 +1014,78 @@ abstract class Storage[R](val graph: Graph,
 
   /** EdgeMutate */
   def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
+    // skip sampling for delete operation
     val deleteMutations = edgeMutate.edgesToDelete.flatMap { indexEdge =>
-      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete))
+      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete, durability = indexEdge.label.durability))
     }
+
     val insertMutations = edgeMutate.edgesToInsert.flatMap { indexEdge =>
-      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
+      if (indexEdge.isOutEdge) 
indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put, durability = indexEdge.label.durability))
+      else {
+        // For InEdge
+        indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put, durability = indexEdge.label.durability))
+      }
     }
 
     deleteMutations ++ insertMutations
   }
 
   def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
-    edgeMutate.newSnapshotEdge.map(e => 
snapshotEdgeSerializer(e).toKeyValues).getOrElse(Nil)
+    edgeMutate.newSnapshotEdge.map(e => 
snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = 
e.label.durability))).getOrElse(Nil)
+
+  def incrementsInOut(edgeMutate: EdgeMutate): (Seq[SKeyValue], 
Seq[SKeyValue]) = {
+
+    def filterOutDegree(e: IndexEdge): Boolean = true
 
-  def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] =
     (edgeMutate.edgesToDelete.isEmpty, edgeMutate.edgesToInsert.isEmpty) match 
{
       case (true, true) =>
 
         /** when there is no need to update. shouldUpdate == false */
-        List.empty
+        (Nil, Nil)
       case (true, false) =>
 
         /** no edges to delete but there is new edges to insert so increase 
degree by 1 */
-        edgeMutate.edgesToInsert.flatMap { e => buildIncrementsAsync(e) }
+        val (inEdges, outEdges) = 
edgeMutate.edgesToInsert.partition(_.isInEdge)
+
+        val in = 
inEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_))
+        val out = 
outEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_))
+
+        in -> out
       case (false, true) =>
 
         /** no edges to insert but there is old edges to delete so decrease 
degree by 1 */
-        edgeMutate.edgesToDelete.flatMap { e => buildIncrementsAsync(e, -1L) }
+        val (inEdges, outEdges) = 
edgeMutate.edgesToDelete.partition(_.isInEdge)
+
+        val in = 
inEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_, -1))
+        val out = 
outEdges.filter(filterOutDegree).flatMap(buildIncrementsAsync(_, -1))
+
+        in -> out
       case (false, false) =>
 
         /** update on existing edges so no change on degree */
-        List.empty
+        (Nil, Nil)
     }
+  }
+
+  def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = {
+    val (in, out) = incrementsInOut(edgeMutate)
+    in ++ out
+  }
 
   /** IndexEdge */
   def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): 
Seq[SKeyValue] = {
-    val newProps = indexedEdge.props ++ Map(LabelMeta.degreeSeq -> 
InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))
+    val newProps = indexedEdge.props ++ Map(LabelMeta.degree -> 
InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))
     val _indexedEdge = indexedEdge.copy(props = newProps)
-    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment))
+    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment, durability = _indexedEdge.label.durability))
   }
 
   def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): 
Seq[SKeyValue] = {
-    val newProps = indexedEdge.props ++ Map(LabelMeta.countSeq -> 
InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))
+    val newProps = indexedEdge.props ++ Map(LabelMeta.count -> 
InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))
     val _indexedEdge = indexedEdge.copy(props = newProps)
-    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment))
+    indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Increment, durability = _indexedEdge.label.durability))
   }
+
+  //TODO: ServiceColumn do not have durability property yet.
   def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = {
     val kvs = vertexSerializer(vertex).toKeyValues
     val kv = kvs.head
@@ -1383,10 +1107,23 @@ abstract class Storage[R](val graph: Graph,
     }
   }
 
+  def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = {
+    val kvs = edge.edgesWithIndexValid.flatMap { _indexEdge =>
+      val newProps = Map(LabelMeta.degree -> 
InnerValLikeWithTs.withLong(degreeVal, _indexEdge.ts, _indexEdge.schemaVer))
+      val indexEdge = _indexEdge.copy(props = newProps)
+
+      indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put, durability = indexEdge.label.durability))
+    }
+
+    kvs
+  }
+
   def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
     vertex.op match {
       case d: Byte if d == GraphUtil.operations("delete") => 
vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete))
       case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))
     }
   }
+
+  def info: Map[String, String] = Map("className" -> 
this.getClass.getSimpleName)
 }

Reply via email to