Repository: incubator-s2graph
Updated Branches:
  refs/heads/master f74c224ac -> 247b2cb9d


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 2d34c7a..1a85dba 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -54,7 +54,7 @@ object RestHandler {
   * Public API, only return Future.successful or Future.failed
   * Don't throw exception
   */
-class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
+class RestHandler(graph: S2Graph)(implicit ec: ExecutionContext) {
 
   import RestHandler._
   val requestParser = new RequestParser(graph)
@@ -172,7 +172,7 @@ class RestHandler(graph: Graph)(implicit ec: 
ExecutionContext) {
   }
 
   def getEdgesAsync(jsonQuery: JsValue, impIdOpt: Option[String] = None)
-                   (post: (Graph, QueryOption, StepResult) => JsValue): 
Future[JsValue] = {
+                   (post: (S2Graph, QueryOption, StepResult) => JsValue): 
Future[JsValue] = {
 
     def query(obj: JsValue): Future[JsValue] = {
       (obj \ "queries").asOpt[JsValue] match {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index efe7a3d..59b7518 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -39,10 +39,10 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.hadoop.hbase.util.Bytes
 
 
-abstract class Storage[Q, R](val graph: Graph,
+abstract class Storage[Q, R](val graph: S2Graph,
                           val config: Config)(implicit ec: ExecutionContext) {
   import HBaseType._
-  import Graph._
+  import S2Graph._
 
   val BackoffTimeout = graph.BackoffTimeout
   val MaxRetryNum = graph.MaxRetryNum
@@ -100,7 +100,7 @@ abstract class Storage[Q, R](val graph: Graph,
    * @param vertex: vertex to serialize
    * @return serializer implementation
    */
-  def vertexSerializer(vertex: Vertex): Serializable[Vertex] = new 
VertexSerializable(vertex)
+  def vertexSerializer(vertex: S2Vertex): Serializable[S2Vertex] = new 
VertexSerializable(vertex)
 
   /**
    * create deserializer that can parse stored CanSKeyValue into snapshotEdge.
@@ -122,7 +122,7 @@ abstract class Storage[Q, R](val graph: Graph,
     snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
-  val indexEdgeDeserializers: Map[String, Deserializable[Edge]] = Map(
+  val indexEdgeDeserializers: Map[String, Deserializable[S2Edge]] = Map(
     VERSION1 -> new IndexEdgeDeserializable(graph),
     VERSION2 -> new IndexEdgeDeserializable(graph),
     VERSION3 -> new IndexEdgeDeserializable(graph),
@@ -133,7 +133,7 @@ abstract class Storage[Q, R](val graph: Graph,
     indexEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parser stored CanSKeyValue into vertex. */
-  val vertexDeserializer: Deserializable[Vertex] = new 
VertexDeserializable(graph)
+  val vertexDeserializer: Deserializable[S2Vertex] = new 
VertexDeserializable(graph)
 
 
   /**
@@ -195,7 +195,7 @@ abstract class Storage[Q, R](val graph: Graph,
     * @param queryRequest
    * @return
     */
-  protected def buildRequest(queryRequest: QueryRequest, edge: Edge): Q
+  protected def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q
 
   /**
    * fetch IndexEdges for given queryParam in queryRequest.
@@ -242,7 +242,7 @@ abstract class Storage[Q, R](val graph: Graph,
    * @param withWait
    * @return
    */
-  def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long, Long)]]
+  def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long, Long)]]
 
   /**
    * this method need to be called when client shutdown. this is responsible 
to cleanUp the resources
@@ -276,9 +276,9 @@ abstract class Storage[Q, R](val graph: Graph,
 
 
   /** Public Interface */
-  def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
+  def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
     def fromResult(kvs: Seq[SKeyValue],
-                   version: String): Option[Vertex] = {
+                   version: String): Option[S2Vertex] = {
       if (kvs.isEmpty) None
       else vertexDeserializer.fromKeyValues(None, kvs, version, None)
 //        .map(S2Vertex(graph, _))
@@ -286,7 +286,7 @@ abstract class Storage[Q, R](val graph: Graph,
 
     val futures = vertices.map { vertex =>
       val queryParam = QueryParam.Empty
-      val q = Query.toQuery(Seq(vertex), queryParam)
+      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
       val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
       fetchVertexKeyValues(queryRequest).map { kvs =>
         fromResult(kvs, vertex.serviceColumn.schemaVersion)
@@ -297,7 +297,7 @@ abstract class Storage[Q, R](val graph: Graph,
 
     Future.sequence(futures).map { result => result.toList.flatten }
   }
-  def mutateStrongEdges(_edges: Seq[Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
+  def mutateStrongEdges(_edges: Seq[S2Edge], withWait: Boolean): 
Future[Seq[Boolean]] = {
 
     val edgeWithIdxs = _edges.zipWithIndex
     val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
@@ -332,7 +332,7 @@ abstract class Storage[Q, R](val graph: Graph,
     }
   }
 
-  def mutateVertex(vertex: Vertex, withWait: Boolean): Future[Boolean] = {
+  def mutateVertex(vertex: S2Vertex, withWait: Boolean): Future[Boolean] = {
     if (vertex.op == GraphUtil.operations("delete")) {
       writeToStorage(vertex.hbaseZkAddr,
         vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)), withWait)
@@ -344,14 +344,14 @@ abstract class Storage[Q, R](val graph: Graph,
     }
   }
 
-  def mutateVertices(vertices: Seq[Vertex],
+  def mutateVertices(vertices: Seq[S2Vertex],
                      withWait: Boolean = false): Future[Seq[Boolean]] = {
     val futures = vertices.map { vertex => mutateVertex(vertex, withWait) }
     Future.sequence(futures)
   }
 
 
-  def mutateEdgesInner(edges: Seq[Edge],
+  def mutateEdgesInner(edges: Seq[S2Edge],
                        checkConsistency: Boolean,
                        withWait: Boolean): Future[Boolean] = {
     assert(edges.nonEmpty)
@@ -360,7 +360,7 @@ abstract class Storage[Q, R](val graph: Graph,
 
       val zkQuorum = edges.head.innerLabel.hbaseZkAddr
       val futures = edges.map { edge =>
-        val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge))
+        val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
 
         val mutations =
           indexedEdgeMutations(edgeUpdate) ++ 
snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate)
@@ -382,7 +382,7 @@ abstract class Storage[Q, R](val graph: Graph,
     Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt)
   }
 
-  def retry(tryNum: Int)(edges: Seq[Edge], statusCode: Byte, 
fetchedSnapshotEdgeOpt: Option[Edge]): Future[Boolean] = {
+  def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, 
fetchedSnapshotEdgeOpt: Option[S2Edge]): Future[Boolean] = {
     if (tryNum >= MaxRetryNum) {
       edges.foreach { edge =>
         logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
@@ -442,9 +442,9 @@ abstract class Storage[Q, R](val graph: Graph,
     }
   }
 
-  protected def commitUpdate(edges: Seq[Edge],
+  protected def commitUpdate(edges: Seq[S2Edge],
                              statusCode: Byte,
-                             fetchedSnapshotEdgeOpt: Option[Edge]): 
Future[Boolean] = {
+                             fetchedSnapshotEdgeOpt: Option[S2Edge]): 
Future[Boolean] = {
 //    Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
     assert(edges.nonEmpty)
 //    assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
@@ -460,7 +460,7 @@ abstract class Storage[Q, R](val graph: Graph,
            * lock = (squashedEdge, pendingE)
            * releaseLock = (edgeMutate.newSnapshotEdge, None)
            */
-            val (squashedEdge, edgeMutate) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+            val (squashedEdge, edgeMutate) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
 
             assert(edgeMutate.newSnapshotEdge.isDefined)
 
@@ -482,7 +482,7 @@ abstract class Storage[Q, R](val graph: Graph,
                  * lock = (snapshotEdge, pendingE)
                  * releaseLock = (edgeMutate.newSnapshotEdge, None)
                  */
-                val (squashedEdge, edgeMutate) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+                val (squashedEdge, edgeMutate) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
                 if (edgeMutate.newSnapshotEdge.isEmpty) {
                   logger.debug(s"drop this requests: 
\n${edges.map(_.toLogString).mkString("\n")}")
                   Future.successful(true)
@@ -508,8 +508,8 @@ abstract class Storage[Q, R](val graph: Graph,
                    */
                   logger.debug(s"${pendingEdge.toLogString} has been expired.")
                   val (squashedEdge, edgeMutate) =
-                    if (pendingEdge.ts == snapshotEdge.ts) 
Edge.buildOperation(None, pendingEdge +: edges)
-                    else Edge.buildOperation(fetchedSnapshotEdgeOpt, 
pendingEdge +: edges)
+                    if (pendingEdge.ts == snapshotEdge.ts) 
S2Edge.buildOperation(None, pendingEdge +: edges)
+                    else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, 
pendingEdge +: edges)
 
                   val lockTs = Option(System.currentTimeMillis())
                   val newPendingEdge = squashedEdge.copy(statusCode = 1, 
lockTs = lockTs, version = snapshotEdge.version + 1)
@@ -524,7 +524,7 @@ abstract class Storage[Q, R](val graph: Graph,
                    * this can't be proceed so retry from re-fetch.
                    * throw EX
                    */
-                  val (squashedEdge, _) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
+                  val (squashedEdge, _) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, edges)
                   Future.failed(new PartialFailureException(squashedEdge, 0, 
s"others[${pendingEdge.ts}] is mutating. me[${squashedEdge.ts}]"))
                 }
             }
@@ -551,7 +551,7 @@ abstract class Storage[Q, R](val graph: Graph,
         val _edges =
           if (fetchedSnapshotEdgeOpt.isDefined && 
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) 
fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
           else edges
-        val (squashedEdge, edgeMutate) = 
Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
+        val (squashedEdge, edgeMutate) = 
S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
         val newVersion = 
fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2
         val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
           case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, 
pendingEdgeOpt = None, version = newVersion)
@@ -575,8 +575,8 @@ abstract class Storage[Q, R](val graph: Graph,
    * @return
    */
   protected def commitProcess(statusCode: Byte,
-                              squashedEdge: Edge,
-                              fetchedSnapshotEdgeOpt:Option[Edge],
+                              squashedEdge: S2Edge,
+                              fetchedSnapshotEdgeOpt:Option[S2Edge],
                               lockSnapshotEdge: SnapshotEdge,
                               releaseLockSnapshotEdge: SnapshotEdge,
                               edgeMutate: EdgeMutate): Future[Boolean] = {
@@ -588,7 +588,7 @@ abstract class Storage[Q, R](val graph: Graph,
     } yield lockReleased
   }
 
-  case class PartialFailureException(edge: Edge, statusCode: Byte, failReason: 
String) extends NoStackException(failReason)
+  case class PartialFailureException(edge: S2Edge, statusCode: Byte, 
failReason: String) extends NoStackException(failReason)
 
   protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) 
= {
     val msg = Seq(s"[$ret] [$phase]", 
s"${snapshotEdge.toLogString()}").mkString("\n")
@@ -611,8 +611,8 @@ abstract class Storage[Q, R](val graph: Graph,
    * @return
    */
   protected def acquireLock(statusCode: Byte,
-                            squashedEdge: Edge,
-                            fetchedSnapshotEdgeOpt: Option[Edge],
+                            squashedEdge: S2Edge,
+                            fetchedSnapshotEdgeOpt: Option[S2Edge],
                             lockEdge: SnapshotEdge): Future[Boolean] = {
     if (statusCode >= 1) {
       logger.debug(s"skip acquireLock: 
[$statusCode]\n${squashedEdge.toLogString}")
@@ -663,7 +663,7 @@ abstract class Storage[Q, R](val graph: Graph,
    */
   protected def releaseLock(predicate: Boolean,
                             statusCode: Byte,
-                            squashedEdge: Edge,
+                            squashedEdge: S2Edge,
                             releaseLockEdge: SnapshotEdge): Future[Boolean] = {
     if (!predicate) {
       Future.failed(new PartialFailureException(squashedEdge, 3, "predicate 
failed."))
@@ -708,7 +708,7 @@ abstract class Storage[Q, R](val graph: Graph,
    */
   protected def commitIndexEdgeMutations(predicate: Boolean,
                        statusCode: Byte,
-                       squashedEdge: Edge,
+                       squashedEdge: S2Edge,
                        edgeMutate: EdgeMutate): Future[Boolean] = {
     if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, 
"predicate failed."))
     else {
@@ -742,7 +742,7 @@ abstract class Storage[Q, R](val graph: Graph,
    */
   protected def commitIndexEdgeDegreeMutations(predicate: Boolean,
                           statusCode: Byte,
-                          squashedEdge: Edge,
+                          squashedEdge: S2Edge,
                           edgeMutate: EdgeMutate): Future[Boolean] = {
 
     def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
@@ -772,8 +772,8 @@ abstract class Storage[Q, R](val graph: Graph,
 
   /** end of methods for consistency */
 
-  def mutateLog(snapshotEdgeOpt: Option[Edge], edges: Seq[Edge],
-                newEdge: Edge, edgeMutate: EdgeMutate) =
+  def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge],
+                newEdge: S2Edge, edgeMutate: EdgeMutate) =
     Seq("----------------------------------------------",
       s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
       s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",
@@ -796,17 +796,17 @@ abstract class Storage[Q, R](val graph: Graph,
           val edge = edgeWithScore.edge
           val score = edgeWithScore.score
 
-          val edgeSnapshot = edge.copyEdge(propsWithTs = 
Edge.propsToState(edge.updatePropsWithTs()))
+          val edgeSnapshot = edge.copyEdge(propsWithTs = 
S2Edge.propsToState(edge.updatePropsWithTs()))
           val reversedSnapshotEdgeMutations = 
snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation
 = SKeyValue.Put))
 
-          val edgeForward = edge.copyEdge(propsWithTs = 
Edge.propsToState(edge.updatePropsWithTs()))
+          val edgeForward = edge.copyEdge(propsWithTs = 
S2Edge.propsToState(edge.updatePropsWithTs()))
           val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap 
{ indexEdge =>
             indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
               buildIncrementsAsync(indexEdge, -1L)
           }
 
           /** reverted direction */
-          val edgeRevert = edge.copyEdge(propsWithTs = 
Edge.propsToState(edge.updatePropsWithTs()))
+          val edgeRevert = edge.copyEdge(propsWithTs = 
S2Edge.propsToState(edge.updatePropsWithTs()))
           val reversedIndexedEdgesMutations = 
edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
             indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
               buildIncrementsAsync(indexEdge, -1L)
@@ -829,8 +829,8 @@ abstract class Storage[Q, R](val graph: Graph,
   /** Parsing Logic: parse from kv from Storage into Edge */
   def toEdge[K: CanSKeyValue](kv: K,
                               queryRequest: QueryRequest,
-                              cacheElementOpt: Option[Edge],
-                              parentEdges: Seq[EdgeWithScore]): Option[Edge] = 
{
+                              cacheElementOpt: Option[S2Edge],
+                              parentEdges: Seq[EdgeWithScore]): Option[S2Edge] 
= {
     logger.debug(s"toEdge: $kv")
 
     try {
@@ -851,7 +851,7 @@ abstract class Storage[Q, R](val graph: Graph,
                                       queryRequest: QueryRequest,
                                       cacheElementOpt: Option[SnapshotEdge] = 
None,
                                       isInnerCall: Boolean,
-                                      parentEdges: Seq[EdgeWithScore]): 
Option[Edge] = {
+                                      parentEdges: Seq[EdgeWithScore]): 
Option[S2Edge] = {
 //        logger.debug(s"SnapshottoEdge: $kv")
     val queryParam = queryRequest.queryParam
     val schemaVer = queryParam.label.schemaVersion
@@ -959,7 +959,7 @@ abstract class Storage[Q, R](val graph: Graph,
 
   /** End Of Parse Logic */
 
-  protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: 
Seq[EdgeWithScore]): Edge = {
+  protected def toRequestEdge(queryRequest: QueryRequest, parentEdges: 
Seq[EdgeWithScore]): S2Edge = {
     val srcVertex = queryRequest.vertex
     val queryParam = queryRequest.queryParam
     val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
@@ -986,7 +986,7 @@ abstract class Storage[Q, R](val graph: Graph,
     }
   }
 
-  protected def fetchSnapshotEdgeInner(edge: Edge): Future[(QueryParam, 
Option[Edge], Option[SKeyValue])] = {
+  protected def fetchSnapshotEdgeInner(edge: S2Edge): Future[(QueryParam, 
Option[S2Edge], Option[SKeyValue])] = {
     /** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
       * so use empty cacheKey.
       * */
@@ -994,7 +994,7 @@ abstract class Storage[Q, R](val graph: Graph,
       direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
       tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
       cacheTTLInMillis = -1)
-    val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
+    val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
     val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
     //    val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
 
@@ -1097,15 +1097,15 @@ abstract class Storage[Q, R](val graph: Graph,
   }
 
   //TODO: ServiceColumn do not have durability property yet.
-  def buildDeleteBelongsToId(vertex: Vertex): Seq[SKeyValue] = {
+  def buildDeleteBelongsToId(vertex: S2Vertex): Seq[SKeyValue] = {
     val kvs = vertexSerializer(vertex).toKeyValues
     val kv = kvs.head
     vertex.belongLabelIds.map { id =>
-      kv.copy(qualifier = Bytes.toBytes(Vertex.toPropKey(id)), operation = 
SKeyValue.Delete)
+      kv.copy(qualifier = Bytes.toBytes(S2Vertex.toPropKey(id)), operation = 
SKeyValue.Delete)
     }
   }
 
-  def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = {
+  def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = {
     val storeVertex = 
edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
 
     if (storeVertex) {
@@ -1118,7 +1118,7 @@ abstract class Storage[Q, R](val graph: Graph,
     }
   }
 
-  def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = {
+  def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = {
     edge.property(LabelMeta.degree.name, degreeVal, edge.ts)
     val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
       indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put, durability = indexEdge.label.durability))
@@ -1127,7 +1127,7 @@ abstract class Storage[Q, R](val graph: Graph,
     kvs
   }
 
-  def buildPutsAll(vertex: Vertex): Seq[SKeyValue] = {
+  def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = {
     vertex.op match {
       case d: Byte if d == GraphUtil.operations("delete") => 
vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete))
       case _ => vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Put))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index b0287d5..93b2454 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -89,7 +89,7 @@ object AsynchbaseStorage {
 }
 
 
-class AsynchbaseStorage(override val graph: Graph,
+class AsynchbaseStorage(override val graph: S2Graph,
                         override val config: Config)(implicit ec: 
ExecutionContext)
   extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) {
 
@@ -242,7 +242,7 @@ class AsynchbaseStorage(override val graph: Graph,
    * @param queryRequest
    * @return Scanner or GetRequest with proper setup with StartKey, EndKey, 
RangeFilter.
    */
-  override def buildRequest(queryRequest: QueryRequest, edge: Edge): AsyncRPC 
= {
+  override def buildRequest(queryRequest: QueryRequest, edge: S2Edge): 
AsyncRPC = {
     import Serializable._
     val queryParam = queryRequest.queryParam
     val label = queryParam.label
@@ -424,7 +424,7 @@ class AsynchbaseStorage(override val graph: Graph,
    * @param withWait
    * @return
    */
-  override def incrementCounts(edges: Seq[Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long, Long)]] = {
+  override def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): 
Future[Seq[(Boolean, Long, Long)]] = {
 
     val _client = client(withWait)
     val defers: Seq[Deferred[(Boolean, Long, Long)]] = for {
@@ -517,9 +517,9 @@ class AsynchbaseStorage(override val graph: Graph,
 
 
   /** Asynchbase implementation override default getVertices to use future 
Cache */
-  override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
+  override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
     def fromResult(kvs: Seq[SKeyValue],
-                   version: String): Option[Vertex] = {
+                   version: String): Option[S2Vertex] = {
 
       if (kvs.isEmpty) None
       else vertexDeserializer.fromKeyValues(None, kvs, version, None)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index e11a5f6..5549f4e 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -29,10 +29,10 @@ import org.apache.s2graph.core._
 import scala.collection.immutable
 
 object IndexEdgeDeserializable{
-  def getNewInstance(graph: Graph) = new IndexEdgeDeserializable(graph)
+  def getNewInstance(graph: S2Graph) = new IndexEdgeDeserializable(graph)
 }
-class IndexEdgeDeserializable(graph: Graph,
-                              bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[Edge] {
+class IndexEdgeDeserializable(graph: S2Graph,
+                              bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[S2Edge] {
    import StorageDeserializable._
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, 
Boolean, Int)
@@ -41,7 +41,7 @@ class IndexEdgeDeserializable(graph: Graph,
    override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                                     _kvs: Seq[T],
                                                     schemaVer: String,
-                                                    cacheElementOpt: 
Option[Edge]): Edge = {
+                                                    cacheElementOpt: 
Option[S2Edge]): S2Edge = {
 
      assert(_kvs.size == 1)
 
@@ -64,7 +64,7 @@ class IndexEdgeDeserializable(graph: Graph,
      val srcVertex = graph.newVertex(srcVertexId, version)
      //TODO:
      val edge = graph.newEdge(srcVertex, null,
-       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
Edge.EmptyState)
+       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
S2Edge.EmptyState)
      var tsVal = version
 
      if (pos == kv.row.length) {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 60f7d80..706d8cb 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -26,8 +26,8 @@ import org.apache.s2graph.core.types._
 import org.apache.s2graph.core._
 import scala.collection.immutable
 
-class IndexEdgeDeserializable(graph: Graph,
-                              bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[Edge] {
+class IndexEdgeDeserializable(graph: S2Graph,
+                              bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[S2Edge] {
    import StorageDeserializable._
 
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, 
Boolean, Int)
@@ -68,7 +68,7 @@ class IndexEdgeDeserializable(graph: Graph,
    override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                                     _kvs: Seq[T],
                                                     schemaVer: String,
-                                                    cacheElementOpt: 
Option[Edge]): Edge = {
+                                                    cacheElementOpt: 
Option[S2Edge]): S2Edge = {
      assert(_kvs.size == 1)
 
 //     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
@@ -85,7 +85,7 @@ class IndexEdgeDeserializable(graph: Graph,
      val srcVertex = graph.newVertex(srcVertexId, version)
      //TODO:
      val edge = graph.newEdge(srcVertex, null,
-       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
Edge.EmptyState)
+       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
S2Edge.EmptyState)
      var tsVal = version
 
      val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 6c1906e..f4802c0 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, 
LabelIndex, LabelMe
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
SKeyValue, StorageDeserializable}
 import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, 
SourceAndTargetVertexIdPair, SourceVertexId}
-import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex}
 
-class SnapshotEdgeDeserializable(graph: Graph) extends 
Deserializable[SnapshotEdge] {
+class SnapshotEdgeDeserializable(graph: S2Graph) extends 
Deserializable[SnapshotEdge] {
 
   def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
     val statusCode = byte >> 4

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 64b2e31..d3dec1e 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, 
LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
StorageDeserializable}
 import org.apache.s2graph.core.types.TargetVertexId
-import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex}
 
-class SnapshotEdgeDeserializable(graph: Graph) extends 
Deserializable[SnapshotEdge] {
+class SnapshotEdgeDeserializable(graph: S2Graph) extends 
Deserializable[SnapshotEdge] {
 
   def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
     val statusCode = byte >> 4

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
index 6e2311f..ee93505 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -19,20 +19,20 @@
 
 package org.apache.s2graph.core.storage.serde.vertex
 
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.mysqls.{ColumnMeta, Label}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
-import org.apache.s2graph.core.{Graph, QueryParam, Vertex}
+import org.apache.s2graph.core.{S2Graph, QueryParam, S2Vertex}
 
 import scala.collection.mutable.ListBuffer
 
-class VertexDeserializable(graph: Graph,
-                           bytesToInt: (Array[Byte], Int) => Int = bytesToInt) 
extends Deserializable[Vertex] {
+class VertexDeserializable(graph: S2Graph,
+                           bytesToInt: (Array[Byte], Int) => Int = bytesToInt) 
extends Deserializable[S2Vertex] {
   def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                           _kvs: Seq[T],
                                           version: String,
-                                          cacheElementOpt: Option[Vertex]): 
Vertex = {
+                                          cacheElementOpt: Option[S2Vertex]): 
S2Vertex = {
 
     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
 
@@ -40,7 +40,7 @@ class VertexDeserializable(graph: Graph,
     val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
 
     var maxTs = Long.MinValue
-    val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
+    val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike]
     val belongLabelIds = new ListBuffer[Int]
 
     for {
@@ -53,15 +53,18 @@ class VertexDeserializable(graph: Graph,
       val ts = kv.timestamp
       if (ts > maxTs) maxTs = ts
 
-      if (Vertex.isLabelId(propKey)) {
-        belongLabelIds += Vertex.toLabelId(propKey)
+      if (S2Vertex.isLabelId(propKey)) {
+        belongLabelIds += S2Vertex.toLabelId(propKey)
       } else {
         val v = kv.value
         val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
-        propsMap += (propKey -> value)
+        val columnMeta = vertexId.column.metasMap(propKey)
+        propsMap += (columnMeta -> value)
       }
     }
     assert(maxTs != Long.MinValue)
-    graph.newVertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = 
belongLabelIds)
+    val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, 
belongLabelIds = belongLabelIds)
+    S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
+    vertex
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
index 77bbb87..1dbcd00 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -19,12 +19,12 @@
 
 package org.apache.s2graph.core.storage.serde.vertex
 
-import org.apache.s2graph.core.Vertex
+import org.apache.s2graph.core.S2Vertex
 import org.apache.s2graph.core.storage.StorageSerializable._
 import org.apache.s2graph.core.storage.{SKeyValue, Serializable}
-import org.apache.s2graph.core.utils.logger
+import scala.collection.JavaConverters._
 
-case class VertexSerializable(vertex: Vertex, intToBytes: Int => Array[Byte] = 
intToBytes) extends Serializable[Vertex] {
+case class VertexSerializable(vertex: S2Vertex, intToBytes: Int => Array[Byte] 
= intToBytes) extends Serializable[S2Vertex] {
 
   override val table = vertex.hbaseTableName.getBytes
   override val ts = vertex.ts
@@ -38,8 +38,11 @@ case class VertexSerializable(vertex: Vertex, intToBytes: 
Int => Array[Byte] = i
   /** vertex override toKeyValues since vertex expect to produce multiple 
sKeyValues */
   override def toKeyValues: Seq[SKeyValue] = {
     val row = toRowKey
-    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield 
intToBytes(k) -> v.bytes
-    val belongsTo = vertex.belongLabelIds.map { labelId => 
intToBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
+    val base = for ((k, v) <- vertex.props.asScala ++ 
vertex.defaultProps.asScala) yield {
+      val columnMeta = v.columnMeta
+      intToBytes(columnMeta.seq) -> v.innerVal.bytes
+    }
+    val belongsTo = vertex.belongLabelIds.map { labelId => 
intToBytes(S2Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
     (base ++ belongsTo).map { case (qualifier, value) =>
       SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, 
vertex.ts)
     } toSeq

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
index d90cf8e..ea7aa41 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
@@ -268,6 +268,23 @@ trait CanInnerValLike[A] {
 object CanInnerValLike {
   implicit val encodingVer = "v2"
 
+  def castValue(element: Any, classType: String): Any = {
+    import InnerVal._
+    element match {
+      case bd: BigDecimal =>
+        classType match {
+          case DOUBLE => bd.doubleValue()
+          case FLOAT => bd.floatValue()
+          case LONG => bd.longValue()
+          case INT | "int" => bd.intValue()
+          case SHORT => bd.shortValue()
+          case BYTE => bd.byteValue()
+          case _ => throw new RuntimeException(s"not supported data type: 
$element, $classType")
+        }
+      case _ => element
+//        throw new RuntimeException(s"not supported data type: $element, 
${element.getClass.getCanonicalName}, $classType")
+    }
+  }
   def validate(element: Any, classType: String): Boolean = {
     import InnerVal._
     classType match {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
index a949f3e..eb2d42a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
@@ -51,7 +51,7 @@ object VertexId extends HBaseDeserializable {
   }
 }
 
-class VertexId protected (val column: ServiceColumn, val innerId: 
InnerValLike) extends HBaseSerializable {
+class VertexId (val column: ServiceColumn, val innerId: InnerValLike) extends 
HBaseSerializable {
   val storeHash: Boolean = true
   val storeColId: Boolean = true
   val colId = column.id.get

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
deleted file mode 100644
index 55b796d..0000000
--- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{ServiceColumn, LabelMeta}
-import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
-import org.apache.s2graph.core.utils.logger
-import org.scalatest.FunSuite
-import play.api.libs.json.{JsObject, Json}
-
-class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
-  import Edge._
-  initTests()
-
-  val testLabelMeta1 = LabelMeta(Option(-1), labelV2.id.get, "is_blocked", 
1.toByte, "true", "boolean")
-  val testLabelMeta3 = LabelMeta(Option(-1), labelV2.id.get, "time", 3.toByte, 
"-1", "long")
-
-//  test("toLogString") {
-//    val testServiceName = serviceNameV2
-//    val testLabelName = labelNameV2
-//    val bulkQueries = List(
-//      ("1445240543366", "update", "{\"is_blocked\":true}"),
-//      ("1445240543362", "insert", "{\"is_hidden\":false}"),
-//      ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"),
-//      ("1445240543363", "delete", "{}"),
-//      ("1445240543365", "update", "{\"time\":1, \"weight\":-10}"))
-//
-//    val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
-//
-//    val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
-//      val properties = fromJsonToProperties(Json.parse(props).as[JsObject])
-//      Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, 
op).toLogString
-//    }).mkString("\n")
-//
-//    val attachedProps = "\"from\":\"1\",\"to\":\"2\",\"label\":\"" + 
testLabelName +
-//      "\",\"service\":\"" + testServiceName + "\""
-//    val expected = Seq(
-//      Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"is_blocked\":true}"),
-//      Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"is_hidden\":false}"),
-//      Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"is_hidden\":false,\"weight\":10}"),
-//      Seq("1445240543363", "delete", "e", "1", "2", testLabelName, "{" + 
attachedProps + "}"),
-//      Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"time\":1,\"weight\":-10}")
-//    ).map(_.mkString("\t")).mkString("\n")
-//
-//    assert(bulkEdge === expected)
-//  }
-
-  test("buildOperation") {
-    val schemaVersion = "v2"
-    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
-    val tgtVertex = srcVertex
-
-    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
-
-    val snapshotEdge = None
-    val propsWithTs = Map(timestampProp)
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
-
-    val newVersion = 0L
-
-    val newPropsWithTs = Map(
-      timestampProp,
-      testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, 
schemaVersion), 1)
-    )
-
-    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
propsWithTs, newPropsWithTs)
-    logger.info(edgeMutate.toLogString)
-
-    assert(edgeMutate.newSnapshotEdge.isDefined)
-    assert(edgeMutate.edgesToInsert.nonEmpty)
-    assert(edgeMutate.edgesToDelete.isEmpty)
-  }
-
-  test("buildMutation: snapshotEdge: None with newProps") {
-    val schemaVersion = "v2"
-    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
-    val tgtVertex = srcVertex
-
-    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
-
-    val snapshotEdge = None
-    val propsWithTs = Map(timestampProp)
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
-
-    val newVersion = 0L
-
-    val newPropsWithTs = Map(
-      timestampProp,
-      testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, 
schemaVersion), 1)
-    )
-
-    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
propsWithTs, newPropsWithTs)
-    logger.info(edgeMutate.toLogString)
-
-    assert(edgeMutate.newSnapshotEdge.isDefined)
-    assert(edgeMutate.edgesToInsert.nonEmpty)
-    assert(edgeMutate.edgesToDelete.isEmpty)
-  }
-
-  test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") {
-    val schemaVersion = "v2"
-    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
-    val tgtVertex = srcVertex
-
-    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
-
-    val snapshotEdge = None
-    val propsWithTs = Map(timestampProp)
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
-
-    val newVersion = 0L
-
-    val newPropsWithTs = propsWithTs
-
-    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
propsWithTs, newPropsWithTs)
-    logger.info(edgeMutate.toLogString)
-
-    assert(edgeMutate.newSnapshotEdge.isEmpty)
-    assert(edgeMutate.edgesToInsert.isEmpty)
-    assert(edgeMutate.edgesToDelete.isEmpty)
-  }
-
-  test("buildMutation: All props older than snapshotEdge's LastDeletedAt") {
-    val schemaVersion = "v2"
-    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
-    val tgtVertex = srcVertex
-
-    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
-    val oldPropsWithTs = Map(
-      timestampProp,
-      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
-    )
-
-    val propsWithTs = Map(
-      timestampProp,
-      testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 2),
-      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
-    )
-
-    val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = 
propsWithTs)
-
-    val snapshotEdge = Option(_snapshotEdge)
-
-
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
-
-    val newVersion = 0L
-    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
oldPropsWithTs, propsWithTs)
-    logger.info(edgeMutate.toLogString)
-
-    assert(edgeMutate.newSnapshotEdge.nonEmpty)
-    assert(edgeMutate.edgesToInsert.isEmpty)
-    assert(edgeMutate.edgesToDelete.isEmpty)
-  }
-
-  test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") {
-    val schemaVersion = "v2"
-    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
-    val srcVertex = graph.newVertex(vertexId)
-    val tgtVertex = srcVertex
-
-    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
-    val oldPropsWithTs = Map(
-      timestampProp,
-      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
-    )
-
-    val propsWithTs = Map(
-      timestampProp,
-      testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 4),
-      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
-    )
-
-    val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = 
propsWithTs)
-
-    val snapshotEdge = Option(_snapshotEdge)
-
-    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
-
-    val newVersion = 0L
-    val edgeMutate = Edge.buildMutation(snapshotEdge, requestEdge, newVersion, 
oldPropsWithTs, propsWithTs)
-    logger.info(edgeMutate.toLogString)
-
-    assert(edgeMutate.newSnapshotEdge.nonEmpty)
-    assert(edgeMutate.edgesToInsert.nonEmpty)
-    assert(edgeMutate.edgesToDelete.isEmpty)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index cc08b62..d280570 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -34,14 +34,14 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
 
   import TestUtil._
 
-  var graph: Graph = _
+  var graph: S2Graph = _
   var parser: RequestParser = _
   var management: Management = _
   var config: Config = _
 
   override def beforeAll = {
     config = ConfigFactory.load()
-    graph = new Graph(config)(ExecutionContext.Implicits.global)
+    graph = new S2Graph(config)(ExecutionContext.Implicits.global)
     management = new Management(graph)
     parser = new RequestParser(graph)
     initTestData()
@@ -92,7 +92,7 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
       }
     }
 
-    val vertexPropsKeys = List("age" -> "int", "im" -> "string")
+    val vertexPropsKeys = List("age" -> "integer", "im" -> "string")
 
     vertexPropsKeys.map { case (key, keyType) =>
       Management.addVertexProp(testServiceName, testColumnName, key, keyType)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala
new file mode 100644
index 0000000..3eb04ff
--- /dev/null
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/tinkerpop/S2S2GraphTest.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.Integrate.tinkerpop
+
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{S2Graph, TestCommonWithModels, S2Vertex}
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
+import org.apache.tinkerpop.gremlin.structure.{Edge, Vertex, T}
+import org.scalatest.{FunSuite, Matchers}
+
+
+class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels {
+
+  import scala.collection.JavaConversions._
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  initTests()
+
+  val g = new S2Graph(config)
+
+  def printEdges(edges: Seq[Edge]): Unit = {
+    edges.foreach { edge =>
+      logger.debug(s"[FetchedEdge]: $edge")
+    }
+  }
+
+  import scala.language.implicitConversions
+
+  def newVertexId(id: Any, label: Label = labelV2) = 
g.newVertexId(label.srcService, label.srcColumn, id)
+
+  def addVertex(id: AnyRef, label: Label = labelV2) =
+    g.addVertex(T.label, label.srcService.serviceName + 
S2Vertex.VertexLabelDelimiter + label.srcColumnName,
+      T.id, id).asInstanceOf[S2Vertex]
+
+  val srcId = Long.box(20)
+  val range = (100 until 110)
+  testData(srcId, range)
+
+  //  val testProps = Seq(
+  //    Prop("affinity_score", "0.0", DOUBLE),
+  //    Prop("is_blocked", "false", BOOLEAN),
+  //    Prop("time", "0", INT),
+  //    Prop("weight", "0", INT),
+  //    Prop("is_hidden", "true", BOOLEAN),
+  //    Prop("phone_number", "xxx-xxx-xxxx", STRING),
+  //    Prop("score", "0.1", FLOAT),
+  //    Prop("age", "10", INT)
+  //  )
+  def testData(srcId: AnyRef, range: Range, label: Label = labelV2) = {
+    val src = addVertex(srcId)
+
+    for {
+      i <- range
+    } {
+      val tgt = addVertex(Int.box(i))
+
+      src.addEdge(labelV2.label, tgt,
+        "age", Int.box(10), 
+        "affinity_score", Double.box(0.1), 
+        "is_blocked", Boolean.box(true),
+        "ts", Long.box(i))
+    }
+  }
+
+  test("test traversal.") {
+    val vertices = g.traversal().V(newVertexId(srcId)).out(labelV2.label).toSeq
+
+    vertices.size should be(range.size)
+    range.reverse.zip(vertices).foreach { case (tgtId, vertex) =>
+      val vertexId = g.newVertexId(labelV2.tgtService, labelV2.tgtColumn, 
tgtId)
+      val expectedId = g.newVertex(vertexId)
+      vertex.asInstanceOf[S2Vertex].innerId should be(expectedId.innerId)
+    }
+  }
+
+  test("test traversal. limit 1") {
+    val vertexIdParams = Seq(newVertexId(srcId))
+    val t: GraphTraversal[Vertex, Double] = g.traversal().V(vertexIdParams: 
_*).outE(labelV2.label).limit(1).values("affinity_score")
+    for {
+      affinityScore <- t
+    } {
+      logger.debug(s"$affinityScore")
+      affinityScore should be (0.1)
+    }
+  }
+  test("test traversal. 3") {
+
+    val l = label
+
+    val srcA = addVertex(Long.box(1), l)
+    val srcB = addVertex(Long.box(2), l)
+    val srcC = addVertex(Long.box(3), l)
+
+    val tgtA = addVertex(Long.box(101), l)
+    val tgtC = addVertex(Long.box(103), l)
+
+    srcA.addEdge(l.label, tgtA)
+    srcA.addEdge(l.label, tgtC)
+    tgtC.addEdge(l.label, srcB)
+    tgtA.addEdge(l.label, srcC)
+
+    val vertexIdParams = Seq(srcA.id)
+    val vertices = g.traversal().V(vertexIdParams: 
_*).out(l.label).out(l.label).toSeq
+    vertices.size should be(2)
+    vertices.foreach { v =>
+      val vertex = v.asInstanceOf[S2Vertex]
+      // TODO: we have too many id. this is ugly and confusing so fix me.
+      vertex.id.innerId == srcB.id.innerId || vertex.id.innerId == 
srcC.id.innerId should be(true)
+      logger.debug(s"[Vertex]: $v")
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
new file mode 100644
index 0000000..94883c9
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.JSONParser._
+import org.apache.s2graph.core.mysqls.{ServiceColumn, LabelMeta}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
+import org.apache.s2graph.core.utils.logger
+import org.scalatest.FunSuite
+import play.api.libs.json.{JsObject, Json}
+
+class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
+  import S2Edge._
+  initTests()
+
+  val testLabelMeta1 = LabelMeta(Option(-1), labelV2.id.get, "is_blocked", 
1.toByte, "true", "boolean")
+  val testLabelMeta3 = LabelMeta(Option(-1), labelV2.id.get, "time", 3.toByte, 
"-1", "long")
+
+//  test("toLogString") {
+//    val testServiceName = serviceNameV2
+//    val testLabelName = labelNameV2
+//    val bulkQueries = List(
+//      ("1445240543366", "update", "{\"is_blocked\":true}"),
+//      ("1445240543362", "insert", "{\"is_hidden\":false}"),
+//      ("1445240543364", "insert", "{\"is_hidden\":false,\"weight\":10}"),
+//      ("1445240543363", "delete", "{}"),
+//      ("1445240543365", "update", "{\"time\":1, \"weight\":-10}"))
+//
+//    val (srcId, tgtId, labelName) = ("1", "2", testLabelName)
+//
+//    val bulkEdge = (for ((ts, op, props) <- bulkQueries) yield {
+//      val properties = fromJsonToProperties(Json.parse(props).as[JsObject])
+//      Edge.toEdge(srcId, tgtId, labelName, "out", properties, ts.toLong, 
op).toLogString
+//    }).mkString("\n")
+//
+//    val attachedProps = "\"from\":\"1\",\"to\":\"2\",\"label\":\"" + 
testLabelName +
+//      "\",\"service\":\"" + testServiceName + "\""
+//    val expected = Seq(
+//      Seq("1445240543366", "update", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"is_blocked\":true}"),
+//      Seq("1445240543362", "insert", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"is_hidden\":false}"),
+//      Seq("1445240543364", "insert", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"is_hidden\":false,\"weight\":10}"),
+//      Seq("1445240543363", "delete", "e", "1", "2", testLabelName, "{" + 
attachedProps + "}"),
+//      Seq("1445240543365", "update", "e", "1", "2", testLabelName, "{" + 
attachedProps + ",\"time\":1,\"weight\":-10}")
+//    ).map(_.mkString("\t")).mkString("\n")
+//
+//    assert(bulkEdge === expected)
+//  }
+
+  test("buildOperation") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+
+    val snapshotEdge = None
+    val propsWithTs = Map(timestampProp)
+    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
+
+    val newVersion = 0L
+
+    val newPropsWithTs = Map(
+      timestampProp,
+      testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, 
schemaVersion), 1)
+    )
+
+    val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, 
newVersion, propsWithTs, newPropsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.isDefined)
+    assert(edgeMutate.edgesToInsert.nonEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: snapshotEdge: None with newProps") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+
+    val snapshotEdge = None
+    val propsWithTs = Map(timestampProp)
+    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
+
+    val newVersion = 0L
+
+    val newPropsWithTs = Map(
+      timestampProp,
+      testLabelMeta1 -> InnerValLikeWithTs(InnerVal.withBoolean(false, 
schemaVersion), 1)
+    )
+
+    val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, 
newVersion, propsWithTs, newPropsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.isDefined)
+    assert(edgeMutate.edgesToInsert.nonEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+
+    val snapshotEdge = None
+    val propsWithTs = Map(timestampProp)
+    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
+
+    val newVersion = 0L
+
+    val newPropsWithTs = propsWithTs
+
+    val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, 
newVersion, propsWithTs, newPropsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.isEmpty)
+    assert(edgeMutate.edgesToInsert.isEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: All props older than snapshotEdge's LastDeletedAt") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+    val oldPropsWithTs = Map(
+      timestampProp,
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val propsWithTs = Map(
+      timestampProp,
+      testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 2),
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = 
propsWithTs)
+
+    val snapshotEdge = Option(_snapshotEdge)
+
+
+    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
+
+    val newVersion = 0L
+    val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, 
newVersion, oldPropsWithTs, propsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.nonEmpty)
+    assert(edgeMutate.edgesToInsert.isEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+
+  test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") {
+    val schemaVersion = "v2"
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", 
schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
+    val tgtVertex = srcVertex
+
+    val timestampProp = LabelMeta.timestamp -> 
InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
+    val oldPropsWithTs = Map(
+      timestampProp,
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val propsWithTs = Map(
+      timestampProp,
+      testLabelMeta3 -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 4),
+      LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, 
schemaVersion), 3)
+    )
+
+    val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = 
propsWithTs)
+
+    val snapshotEdge = Option(_snapshotEdge)
+
+    val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, 
labelWithDirV2.dir, propsWithTs = propsWithTs)
+
+    val newVersion = 0L
+    val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, 
newVersion, oldPropsWithTs, propsWithTs)
+    logger.info(edgeMutate.toLogString)
+
+    assert(edgeMutate.newSnapshotEdge.nonEmpty)
+    assert(edgeMutate.edgesToInsert.nonEmpty)
+    assert(edgeMutate.edgesToDelete.isEmpty)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 12eae77..1997354 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -32,13 +32,13 @@ trait TestCommonWithModels {
   import InnerVal._
   import types.HBaseType._
 
-  var graph: Graph = _
+  var graph: S2Graph = _
   var config: Config = _
   var management: Management = _
 
   def initTests() = {
     config = ConfigFactory.load()
-    graph = new Graph(config)(ExecutionContext.Implicits.global)
+    graph = new S2Graph(config)(ExecutionContext.Implicits.global)
     management = new Management(graph)
 
     implicit val session = AutoSession

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala
index 9e220cf..30011b1 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/BenchmarkCommon.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.benchmark
 
 import com.typesafe.config.{ConfigFactory, Config}
-import org.apache.s2graph.core.{Management, Graph}
+import org.apache.s2graph.core.{Management, S2Graph$}
 import org.specs2.mutable.Specification
 import scalikejdbc.AutoSession
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index cb6090e..25dd0e4 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -37,7 +37,7 @@ class WhereParserTest extends FunSuite with Matchers with 
TestCommonWithModels {
   val ts = System.currentTimeMillis()
   val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, 
label.schemaVersion)
 
-  def validate(label: Label)(edge: Edge)(sql: String)(expected: Boolean) = {
+  def validate(label: Label)(edge: S2Edge)(sql: String)(expected: Boolean) = {
     def debug(whereOpt: Try[Where]) = {
       println("==================")
       println(s"$whereOpt")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
index e530e65..17ecc87 100644
--- 
a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
+++ 
b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.counter.helper
 
 import com.typesafe.config.Config
 import org.apache
-import org.apache.s2graph.core.Graph
+import org.apache.s2graph.core.S2Graph
 import org.apache.s2graph.core.mysqls.Label
 import org.apache.s2graph.counter
 import org.apache.s2graph.counter.config.S2CounterConfig
@@ -37,7 +37,7 @@ class CounterAdmin(config: Config) {
    val s2config = new S2CounterConfig(config)
    val counterModel = new CounterModel(config)
    val graphOp = new GraphOperation(config)
-   val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+   val s2graph = new S2Graph(config)(scala.concurrent.ExecutionContext.global)
    val storageManagement = new org.apache.s2graph.core.Management(s2graph)
 
    def setupCounterOnGraph(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
index c1d25f3..62174ad 100644
--- 
a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
+++ 
b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.counter.core
 
 import com.typesafe.config.ConfigFactory
 import org.apache.s2graph.core.mysqls.Label
-import org.apache.s2graph.core.{Graph, Management}
+import org.apache.s2graph.core.{S2Graph$, Management}
 import org.apache.s2graph.counter.config.S2CounterConfig
 import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
 import org.apache.s2graph.counter.core.v2.{GraphOperation, RankingStorageGraph}
@@ -87,7 +87,7 @@ class RankingCounterSpec extends Specification with 
BeforeAfterAll {
       }
 
       val graphOp = new GraphOperation(config)
-      val graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+      val graph = new S2Graph(config)(scala.concurrent.ExecutionContext.global)
       val management = new Management(graph)
       management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, 
s"${service}_dev", 1, None, "gz")
       val strJs =

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
index a659ed3..37779dc 100644
--- 
a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
+++ 
b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.counter.loader.core
 
-import org.apache.s2graph.core.{Edge, Graph, GraphUtil}
+import org.apache.s2graph.core.{S2Edge, S2Graph, GraphUtil}
 import org.apache.s2graph.counter.loader.config.StreamingConfig
 import org.apache.s2graph.counter.models.CounterModel
 import org.apache.s2graph.spark.config.S2ConfigFactory
@@ -32,12 +32,12 @@ object CounterEtlFunctions extends Logging {
   lazy val preFetchSize = StreamingConfig.PROFILE_PREFETCH_SIZE
   lazy val config = S2ConfigFactory.config
   lazy val counterModel = new CounterModel(config)
-  lazy val graph = new 
Graph(config)(scala.concurrent.ExecutionContext.Implicits.global)
+  lazy val graph = new 
S2Graph(config)(scala.concurrent.ExecutionContext.Implicits.global)
 
-  def logToEdge(line: String): Option[Edge] = {
+  def logToEdge(line: String): Option[S2Edge] = {
     for {
-      elem <- graph.toGraphElement(line) if elem.isInstanceOf[Edge]
-      edge <- Some(elem.asInstanceOf[Edge]).filter { x =>
+      elem <- graph.toGraphElement(line) if elem.isInstanceOf[S2Edge]
+      edge <- Some(elem.asInstanceOf[S2Edge]).filter { x =>
         filterOps.contains(x.op)
       }
     } yield {
@@ -50,8 +50,8 @@ object CounterEtlFunctions extends Logging {
      * 1427082276804   insert  edge    19073318        
52453027_93524145648511699      story_user_ch_doc_view  {"doc_type" : "l", 
"channel_subscribing" : "y", "view_from" : "feed"}
      */
     for {
-      elem <- graph.toGraphElement(line) if elem.isInstanceOf[Edge]
-      edge <- Some(elem.asInstanceOf[Edge]).filter { x =>
+      elem <- graph.toGraphElement(line) if elem.isInstanceOf[S2Edge]
+      edge <- Some(elem.asInstanceOf[S2Edge]).filter { x =>
         filterOps.contains(x.op)
       }
     } yield {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
----------------------------------------------------------------------
diff --git 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
index 8ae1de3..6985758 100644
--- 
a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
+++ 
b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.counter.loader.core
 import com.typesafe.config.ConfigFactory
 import org.apache.s2graph.core.mysqls.{Label, Service}
 import org.apache.s2graph.core.types.HBaseType
-import org.apache.s2graph.core.{Graph, Management}
+import org.apache.s2graph.core.{S2Graph$, Management}
 import org.apache.s2graph.counter.models.DBModel
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
@@ -33,7 +33,7 @@ class CounterEtlFunctionsSpec extends FlatSpec with 
BeforeAndAfterAll with Match
   val cluster = config.getString("hbase.zookeeper.quorum")
   DBModel.initialize(config)
 
-  val graph = new Graph(config)(global)
+  val graph = new S2Graph(config)(global)
   val management = new Management(graph)
 
   override def beforeAll: Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala 
b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index a6f8f5c..a47fda7 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -39,7 +39,7 @@ import org.apache.s2graph.core.rest.RestHandler
 import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult}
 import org.apache.s2graph.core.utils.Extensions._
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{Graph, PostProcess}
+import org.apache.s2graph.core.{S2Graph, PostProcess}
 import play.api.libs.json._
 
 import scala.collection.mutable
@@ -217,7 +217,7 @@ object NettyServer extends App {
   val maxBodySize = Try(config.getInt("max.body.size")).recover { case _ => 
65536 * 2 }.get
 
   // init s2graph with config
-  val s2graph = new Graph(config)(ec)
+  val s2graph = new S2Graph(config)(ec)
   val rest = new RestHandler(s2graph)(ec)
 
   val deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover 
{ case _ => "release info not found\n" }.get

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
index 692ab1e..e5fc75d 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/Bootstrap.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.Executors
 
 import org.apache.s2graph.core.rest.{RequestParser, RestHandler}
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{ExceptionHandler, Graph, Management}
+import org.apache.s2graph.core.{ExceptionHandler, S2Graph, Management}
 import org.apache.s2graph.rest.play.actors.QueueActor
 import org.apache.s2graph.rest.play.config.Config
 import org.apache.s2graph.rest.play.controllers.ApplicationController
@@ -36,7 +36,7 @@ import scala.io.Source
 import scala.util.Try
 
 object Global extends WithFilters(new GzipFilter()) {
-  var s2graph: Graph = _
+  var s2graph: S2Graph = _
   var storageManagement: Management = _
   var s2parser: RequestParser = _
   var s2rest: RestHandler = _
@@ -50,7 +50,7 @@ object Global extends WithFilters(new GzipFilter()) {
     val config = Config.conf.underlying
 
     // init s2graph with config
-    s2graph = new Graph(config)(ec)
+    s2graph = new S2Graph(config)(ec)
     storageManagement = new Management(s2graph)
     s2parser = new RequestParser(s2graph) 
     s2rest = new RestHandler(s2graph)(ec)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
index d46d8d2..75ffbc3 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/actors/QueueActor.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 import akka.actor._
 import org.apache.s2graph.core.ExceptionHandler._
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphElement}
+import org.apache.s2graph.core.{ExceptionHandler, S2Graph, GraphElement}
 import org.apache.s2graph.rest.play.actors.Protocol.FlushAll
 import org.apache.s2graph.rest.play.config.Config
 import play.api.Play.current
@@ -46,7 +46,7 @@ object QueueActor {
   var router: ActorRef = _
 
   //    Akka.system.actorOf(props(), name = "queueActor")
-  def init(s2: Graph, walLogHandler: ExceptionHandler) = {
+  def init(s2: S2Graph, walLogHandler: ExceptionHandler) = {
     router = Akka.system.actorOf(props(s2, walLogHandler))
   }
 
@@ -56,10 +56,10 @@ object QueueActor {
     Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2)
   }
 
-  def props(s2: Graph, walLogHandler: ExceptionHandler): Props = 
Props(classOf[QueueActor], s2, walLogHandler)
+  def props(s2: S2Graph, walLogHandler: ExceptionHandler): Props = 
Props(classOf[QueueActor], s2, walLogHandler)
 }
 
-class QueueActor(s2: Graph, walLogHandler: ExceptionHandler) extends Actor 
with ActorLogging {
+class QueueActor(s2: S2Graph, walLogHandler: ExceptionHandler) extends Actor 
with ActorLogging {
 
   import Protocol._
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index b1635fb..aed8ced 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -38,7 +38,7 @@ object EdgeController extends Controller {
   import ApplicationController._
   import play.api.libs.concurrent.Execution.Implicits._
 
-  private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
+  private val s2: S2Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
   private val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
 
@@ -51,9 +51,9 @@ object EdgeController extends Controller {
     val kafkaTopic = toKafkaTopic(graphElem.isAsync)
 
     graphElem match {
-      case v: Vertex =>
+      case v: S2Vertex =>
         enqueue(kafkaTopic, graphElem, tsv)
-      case e: Edge =>
+      case e: S2Edge =>
         e.innerLabel.extraOptions.get("walLog") match {
           case None =>
             enqueue(kafkaTopic, e, tsv)
@@ -73,7 +73,7 @@ object EdgeController extends Controller {
     }
   }
 
-  private def toDeleteAllFailMessages(srcVertices: Seq[Vertex], labels: 
Seq[Label], dir: Int, ts: Long ) = {
+  private def toDeleteAllFailMessages(srcVertices: Seq[S2Vertex], labels: 
Seq[Label], dir: Int, ts: Long ) = {
     for {
       vertex <- srcVertices
       id = vertex.id.toString
@@ -92,7 +92,7 @@ object EdgeController extends Controller {
     val result = s2.mutateElements(elements.map(_._1), true)
     result onComplete { results =>
       results.get.zip(elements).map {
-        case (false, (e: Edge, tsv: String)) =>
+        case (false, (e: S2Edge, tsv: String)) =>
           val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
             toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), 
e.labelWithDir.dir, e.ts)
           } else{
@@ -267,7 +267,7 @@ object EdgeController extends Controller {
     }
 
     def deleteEach(labels: Seq[Label], direction: String, ids: Seq[JsValue],
-                   ts: Long, vertices: Seq[Vertex]) = {
+                   ts: Long, vertices: Seq[S2Vertex]) = {
 
       val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, 
GraphUtil.directions(direction), ts)
       if (withWait) {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
----------------------------------------------------------------------
diff --git 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
index 72e6e82..43f0b15 100644
--- 
a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
+++ 
b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/VertexController.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.rest.play.controllers
 
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{ExceptionHandler, Graph, GraphExceptions}
+import org.apache.s2graph.core.{ExceptionHandler, S2Graph, GraphExceptions}
 import org.apache.s2graph.rest.play.actors.QueueActor
 import org.apache.s2graph.rest.play.config.Config
 import play.api.libs.json.{JsValue, Json}
@@ -30,7 +30,7 @@ import play.api.mvc.{Controller, Result}
 import scala.concurrent.Future
 
 object VertexController extends Controller {
-  private val s2: Graph = org.apache.s2graph.rest.play.Global.s2graph
+  private val s2: S2Graph = org.apache.s2graph.rest.play.Global.s2graph
   private val requestParser: RequestParser = 
org.apache.s2graph.rest.play.Global.s2parser
   private val walLogHandler: ExceptionHandler = 
org.apache.s2graph.rest.play.Global.wallLogHandler
 

Reply via email to