- define initial version of features.
- setup gremlin-test environment.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d05d8a49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d05d8a49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d05d8a49

Branch: refs/heads/master
Commit: d05d8a49d6baeba5f6a0fe5c29d7b0338c839c11
Parents: b91b839
Author: DO YUNG YOON <[email protected]>
Authored: Sun Jan 15 01:09:19 2017 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Sun Jan 15 01:09:52 2017 +0900

----------------------------------------------------------------------
 s2core/build.sbt                                |   4 +-
 .../org/apache/s2graph/core/JSONParser.scala    |  10 +-
 .../org/apache/s2graph/core/Management.scala    |  37 ++-
 .../scala/org/apache/s2graph/core/S2Edge.scala  | 101 +++++---
 .../scala/org/apache/s2graph/core/S2Graph.scala | 253 ++++++++++++++++---
 .../org/apache/s2graph/core/S2Property.scala    |  60 ++++-
 .../org/apache/s2graph/core/S2Vertex.scala      | 107 ++++++--
 .../apache/s2graph/core/S2VertexProperty.scala  |  27 +-
 .../core/features/S2DataTypeFeatures.scala      |  43 ++++
 .../s2graph/core/features/S2EdgeFeatures.scala  |  11 +
 .../core/features/S2EdgePropertyFeatures.scala  |   7 +
 .../core/features/S2ElementFeatures.scala       |  23 ++
 .../s2graph/core/features/S2GraphFeatures.scala |  19 ++
 .../core/features/S2PropertyFeatures.scala      |   7 +
 .../core/features/S2VariableFeatures.scala      |   7 +
 .../s2graph/core/features/S2Variables.scala     |   6 +
 .../core/features/S2VertexFeatures.scala        |  18 ++
 .../features/S2VertexPropertyFeatures.scala     |  24 ++
 .../apache/s2graph/core/mysqls/ColumnMeta.scala |  24 +-
 .../apache/s2graph/core/mysqls/LabelMeta.scala  |   1 +
 .../org/apache/s2graph/core/mysqls/Model.scala  |  59 ++++-
 .../apache/s2graph/core/mysqls/Service.scala    |   4 +-
 .../s2graph/core/mysqls/ServiceColumn.scala     |  17 +-
 .../apache/s2graph/core/storage/Storage.scala   |   8 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |  87 ++++++-
 .../tall/IndexEdgeDeserializable.scala          |  14 +-
 .../wide/IndexEdgeDeserializable.scala          |  14 +-
 .../apache/s2graph/core/types/VertexId.scala    |   8 +-
 .../s2graph/core/utils/SafeUpdateCache.scala    |   2 +
 .../core/Integrate/tinkerpop/S2GraphTest.scala  | 130 ----------
 .../s2graph/core/tinkerpop/S2GraphData.scala    |  12 +
 .../core/tinkerpop/S2GraphProvider.scala        | 220 ++++++++++++++++
 .../S2GraphStructureIntegrateTest.scala         |  13 +
 .../S2GraphStructureStandardTest.scala          |  16 ++
 .../core/tinkerpop/structure/S2GraphTest.scala  | 187 ++++++++++++++
 .../counter/core/RankingCounterSpec.scala       |   2 +-
 scalastyle-config.xml                           | 117 ---------
 37 files changed, 1290 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 6434acc..9ea975c 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -43,8 +43,10 @@ libraryDependencies ++= Seq(
   "org.hbase" % "asynchbase" % "1.7.2" excludeLogging(),
   "net.bytebuddy" % "byte-buddy" % "1.4.26",
   "org.apache.tinkerpop" % "gremlin-core" % tinkerpopVersion,
+  "org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test",
   "org.scalatest" %% "scalatest" % "2.2.4" % "test",
-  "org.specs2" %% "specs2-core" % specs2Version % "test"
+  "org.specs2" %% "specs2-core" % specs2Version % "test",
+  "mysql" % "mysql-connector-java" % "5.1.40"
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index 2effaf1..9d10dc7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -180,35 +180,43 @@ object JSONParser {
     val dType = InnerVal.toInnerDataType(dataType)
     val isNumeric = isNumericType(dType)
     any match {
+      case a: InnerValLike => a
       case n: BigDecimal =>
         if (isNumeric) InnerVal.withNumber(n, version)
         else if (dType == InnerVal.STRING) InnerVal.withStr(n.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = BigDecimal, 
[DataType]: $dataType, [Input]: $any")
       case l: Long =>
         if (isNumeric) InnerVal.withLong(l, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(l.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = Long, 
[DataType]: $dataType, [Input]: $any")
       case i: Int =>
         if (isNumeric) InnerVal.withInt(i, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(i.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = Int, 
[DataType]: $dataType, [Input]: $any")
       case sh: Short =>
         if (isNumeric) InnerVal.withInt(sh.toInt, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(sh.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = Short, 
[DataType]: $dataType, [Input]: $any")
       case b: Byte =>
         if (isNumeric) InnerVal.withInt(b.toInt, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(b.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = Byte, 
[DataType]: $dataType, [Input]: $any")
       case f: Float =>
         if (isNumeric) InnerVal.withFloat(f, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(f.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = Float, 
[DataType]: $dataType, [Input]: $any")
       case d: Double =>
         if (isNumeric) InnerVal.withDouble(d, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(d.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = Double, 
[DataType]: $dataType, [Input]: $any")
       case bl: Boolean =>
         if (dType == InnerVal.BOOLEAN) InnerVal.withBoolean(bl, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(bl.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = Boolean, 
[DataType]: $dataType, [Input]: $any")
       case _s: String =>
         if (isNumeric) {
           try {
-            val s = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_s)
+            val s = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_s.toString)
             InnerVal.withNumber(BigDecimal(s), version)
           } catch {
             case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 064a3d1..63a1727 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -77,15 +77,15 @@ object Management {
                           schemaVersion: String = DEFAULT_VERSION) = {
 
     Model withTx { implicit session =>
-      val serviceOpt = Service.findByName(serviceName)
+      val serviceOpt = Service.findByName(serviceName, useCache = false)
       serviceOpt match {
         case None => throw new RuntimeException(s"create service $serviceName 
has not been created.")
         case Some(service) =>
-          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, 
columnName, Some(columnType), schemaVersion)
+          val serviceColumn = ServiceColumn.findOrInsert(service.id.get, 
columnName, Some(columnType), schemaVersion, useCache = false)
           for {
             Prop(propName, defaultValue, dataType) <- props
           } yield {
-            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType)
+            ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType, 
useCache = false)
           }
       }
     }
@@ -278,7 +278,7 @@ class Management(graph: S2Graph) {
                     compressionAlgorithm: String = 
DefaultCompressionAlgorithm): Try[Service] = {
 
     Model withTx { implicit session =>
-      val service = Service.findOrInsert(serviceName, cluster, hTableName, 
preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm)
+      val service = Service.findOrInsert(serviceName, cluster, hTableName, 
preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, 
useCache = false)
       /** create hbase table for service */
       graph.getStorage(service).createTable(service.cluster, 
service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, 
compressionAlgorithm)
       service
@@ -297,13 +297,13 @@ class Management(graph: S2Graph) {
                   serviceName: String,
                   indices: Seq[Index],
                   props: Seq[Prop],
-                  consistencyLevel: String,
-                  hTableName: Option[String],
-                  hTableTTL: Option[Int],
+                  consistencyLevel: String = "weak",
+                  hTableName: Option[String] = None,
+                  hTableTTL: Option[Int] = None,
                   schemaVersion: String = DEFAULT_VERSION,
-                  isAsync: Boolean,
+                  isAsync: Boolean = false,
                   compressionAlgorithm: String = "gz",
-                  options: Option[String]): Try[Label] = {
+                  options: Option[String] = None): Try[Label] = {
 
     if (label.length > LABEL_NAME_MAX_LENGTH ) throw new 
LabelNameTooLongException(s"Label name ${label} too long.( max length : 
${LABEL_NAME_MAX_LENGTH}} )")
     if (hTableName.isEmpty && hTableTTL.isDefined) throw new 
RuntimeException("if want to specify ttl, give hbaseTableName also")
@@ -355,5 +355,24 @@ class Management(graph: S2Graph) {
     storage.info
   }
 
+  def truncateStorage(labelName: String): Unit = {
+    Try(Label.findByName(labelName, useCache = false)).map { labelOpt =>
+      labelOpt.map { label =>
+        val storage = graph.getStorage(label)
+        val zkAddr = label.service.cluster
+        storage.truncateTable(zkAddr, label.hbaseTableName)
+      }
+    }
+  }
+
+  def deleteStorage(labelName: String): Unit = {
+    Try(Label.findByName(labelName, useCache = false)).map { labelOpt =>
+      labelOpt.map { label =>
+        val storage = graph.getStorage(label)
+        val zkAddr = label.service.cluster
+        storage.deleteTable(zkAddr, label.hbaseTableName)
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 7859218..e953718 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -22,17 +22,18 @@ package org.apache.s2graph.core
 import java.util
 import java.util.function.{Consumer, BiConsumer}
 
-import org.apache.s2graph.core.S2Edge.{Props, State}
+import org.apache.s2graph.core.S2Edge.{State, Props}
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, 
ServiceColumn}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
 import org.apache.tinkerpop.gremlin.structure
-import org.apache.tinkerpop.gremlin.structure.{Edge, Graph, Vertex, Direction, 
Property}
-import play.api.libs.json.{JsNumber, JsObject, Json}
-
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory
+import org.apache.tinkerpop.gremlin.structure.{Graph, Vertex, Edge, Property, 
Direction}
+import play.api.libs.json.{Json, JsNumber, JsObject}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{Map => MutableMap}
+import scala.concurrent.Await
 import scala.util.hashing.MurmurHash3
 
 object SnapshotEdge {
@@ -423,21 +424,28 @@ case class S2Edge(innerGraph: S2Graph,
 
   //    def relatedEdges = List(this)
 
+  private def getServiceColumn(vertex: S2Vertex, defaultServiceColumn: 
ServiceColumn) =
+      if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else 
vertex.id.column
+
   def srcForVertex = {
     val belongLabelIds = Seq(labelWithDir.labelId)
     if (labelWithDir.dir == GraphUtil.directions("in")) {
-      innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), 
tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
+      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), 
tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
     } else {
-      innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), 
srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
+      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), 
srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
     }
   }
 
   def tgtForVertex = {
     val belongLabelIds = Seq(labelWithDir.labelId)
     if (labelWithDir.dir == GraphUtil.directions("in")) {
-      innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), 
srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
+      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), 
srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
     } else {
-      innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), 
tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
+      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), 
tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
     }
   }
 
@@ -481,7 +489,7 @@ case class S2Edge(innerGraph: S2Graph,
 
 //    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, 
GraphUtil.directions("out"))
 
-    property(LabelMeta.timestamp.name, ts, ts)
+    propertyInner(LabelMeta.timestamp.name, ts, ts)
     val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
       GraphUtil.directions("out"), op, version, propsWithTs,
       pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = 
lockTs, tsInnerValOpt = tsInnerValOpt)
@@ -534,15 +542,20 @@ case class S2Edge(innerGraph: S2Graph,
   }
 
   override def equals(other: Any): Boolean = other match {
-    case e: Edge => e.id().equals(e.id())
+    case e: Edge => id().equals(e.id())
     case _ => false
   }
 
-  override def toString(): String = {
-    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, 
"label" -> labelName, "direction" -> direction,
-      "operation" -> operation, "version" -> version, "props" -> 
propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
-      "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, 
"statusCode" -> statusCode, "lockTs" -> lockTs
-    ).toString
+//  override def toString(): String = {
+//    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> 
tgtVertex.toString, "label" -> labelName, "direction" -> direction,
+//      "operation" -> operation, "version" -> version, "props" -> 
propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
+//      "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, 
"statusCode" -> statusCode, "lockTs" -> lockTs
+//    ).toString
+//  }
+
+  override def toString: String = {
+    // E + L_BRACKET + edge.id() + R_BRACKET + L_BRACKET + 
edge.outVertex().id() + DASH + edge.label() + ARROW + edge.inVertex().id() + 
R_BRACKET;
+    s"e[${id}][${srcForVertex.id}-${innerLabel.label}->${tgtForVertex.id}]"
   }
 
   def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
@@ -564,14 +577,14 @@ case class S2Edge(innerGraph: S2Graph,
     val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, 
op, version, S2Edge.EmptyProps,
       parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, 
tsInnerValOpt)
     S2Edge.fillPropsWithTs(edge, propsWithTs)
-    edge.property(LabelMeta.timestamp.name, ts, ts)
+    edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
     edge
   }
 
   def copyEdgeWithState(state: State, ts: Long): S2Edge = {
     val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
     S2Edge.fillPropsWithTs(newEdge, state)
-    newEdge.property(LabelMeta.timestamp.name, ts, ts)
+    newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
     newEdge
   }
 
@@ -584,8 +597,14 @@ case class S2Edge(innerGraph: S2Graph,
   override def vertices(direction: Direction): util.Iterator[structure.Vertex] 
= {
     val arr = new util.ArrayList[Vertex]()
     direction match {
-      case Direction.OUT => arr.add(srcVertex)
-      case Direction.IN => arr.add(tgtVertex)
+      case Direction.OUT =>
+        val newVertexId = 
VertexId(ServiceColumn.findById(srcForVertex.id.colId), srcForVertex.innerId)
+        arr.add(srcVertex.copy(id = newVertexId))
+//        arr.add(srcVertex)
+      case Direction.IN =>
+        val newVertexId = 
VertexId(ServiceColumn.findById(tgtForVertex.id.colId), tgtForVertex.innerId)
+        arr.add(tgtVertex.copy(id = newVertexId))
+//        arr.add(tgtVertex)
       case _ =>
         arr.add(srcVertex)
         arr.add(tgtVertex)
@@ -600,35 +619,57 @@ case class S2Edge(innerGraph: S2Graph,
   }
 
   override def property[V](key: String): Property[V] = {
-    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new 
RuntimeException(s"$key is not configured on Edge."))
+    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new 
java.lang.IllegalStateException(s"$key is not configured on Edge."))
     if (propsWithTs.containsKey(key)) 
propsWithTs.get(key).asInstanceOf[Property[V]]
     else {
       val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
-      property(key, default.innerVal.value, 
default.ts).asInstanceOf[Property[V]]
+      propertyInner(key, default.innerVal.value, 
default.ts).asInstanceOf[Property[V]]
     }
   }
 
+  // just for tinkerpop: save to storage, do not use for internal
   override def property[V](key: String, value: V): Property[V] = {
-    property(key, value, System.currentTimeMillis())
+    S2Property.assertValidProp(key, value)
+
+    val v = propertyInner(key, value, System.currentTimeMillis())
+    val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 
1).getOrElse(System.currentTimeMillis())
+    val newEdge = this.copyEdge(ts = newTs)
+
+    Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), 
innerGraph.WaitTimeout)
+
+    v
   }
 
-  def property[V](key: String, value: V, ts: Long): Property[V] = {
+  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
     val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new 
RuntimeException(s"$key is not configured on Edge."))
     val newProp = new S2Property[V](this, labelMeta, key, value, ts)
     propsWithTs.put(key, newProp)
     newProp
   }
 
-  override def remove(): Unit = {}
+  override def remove(): Unit =  {
+    if (graph.features().edge().supportsRemoveEdges()) {
+      // remove edge
+    } else {
+      throw Edge.Exceptions.edgeRemovalNotSupported()
+    }
+  }
 
   override def graph(): Graph = innerGraph
 
-  override def id(): AnyRef = EdgeId(srcVertex.innerId, tgtVertex.innerId, 
label(), direction)
+  override def id(): AnyRef = {
+    // NOTE: xxxForVertex makes direction to be "out"
+    if (this.innerLabel.consistencyLevel == "strong") {
+      EdgeId(srcForVertex.innerId, tgtForVertex.innerId, label(), "out", 0)
+    } else {
+      EdgeId(srcForVertex.innerId, tgtForVertex.innerId, label(), "out", ts)
+    }
+  }
 
   override def label(): String = innerLabel.label
 }
 
-case class EdgeId(srcVertexId: InnerValLike, tgtVertexId: InnerValLike, 
labelName: String, direction: String)
+case class EdgeId(srcVertexId: InnerValLike, tgtVertexId: InnerValLike, 
labelName: String, direction: String, ts: Long)
 
 object EdgeMutate {
 
@@ -724,7 +765,7 @@ object S2Edge {
     state.foreach { case (k, v) => indexEdge.property(k.name, 
v.innerVal.value, v.ts) }
   }
   def fillPropsWithTs(edge: S2Edge, state: State): Unit = {
-    state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, 
v.ts) }
+    state.foreach { case (k, v) => edge.propertyInner(k.name, 
v.innerVal.value, v.ts) }
   }
 
   def propsToState(props: Props): State = {
@@ -735,7 +776,7 @@ object S2Edge {
 
   def stateToProps(edge: S2Edge, state: State): Props = {
     state.foreach { case (k, v) =>
-      edge.property(k.name, v.innerVal.value, v.ts)
+      edge.propertyInner(k.name, v.innerVal.value, v.ts)
     }
     edge.propsWithTs
   }
@@ -878,7 +919,7 @@ object S2Edge {
               propsWithTs = S2Edge.EmptyProps,
               op = GraphUtil.defaultOpByte
             )
-            newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, 
v.innerVal.value, v.ts) }
+            newPropsWithTs.foreach { case (k, v) => 
newEdge.propertyInner(k.name, v.innerVal.value, v.ts) }
 
             newEdge.relatedEdges.flatMap { relEdge => 
relEdge.edgesWithIndexValid }
           }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 62e8098..bd6c45a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -20,10 +20,11 @@
 package org.apache.s2graph.core
 
 import java.util
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.{Executors, TimeUnit}
 
 import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.commons.configuration.Configuration
+import org.apache.commons.configuration.{BaseConfiguration, Configuration}
 import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, 
LabelNotExistException}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.mysqls._
@@ -33,14 +34,16 @@ import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
 import org.apache.tinkerpop.gremlin.structure
-import org.apache.tinkerpop.gremlin.structure.Graph.Variables
+import org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgeFeatures
+import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper
-import org.apache.tinkerpop.gremlin.structure.{Edge, Graph, T, Transaction}
+import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, Property, 
T, Transaction, Vertex}
 import play.api.libs.json.{JsObject, Json}
 
 import scala.annotation.tailrec
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.concurrent._
 import scala.concurrent.duration.Duration
@@ -52,7 +55,6 @@ object S2Graph {
   type HashKey = (Int, Int, Int, Int, Boolean)
   type FilterHashKey = (Int, Int)
 
-
   val DefaultScore = 1.0
 
   private val DefaultConfigs: Map[String, AnyRef] = Map(
@@ -60,8 +62,10 @@ object S2Graph {
     "hbase.table.name" -> "s2graph",
     "hbase.table.compression.algorithm" -> "gz",
     "phase" -> "dev",
-    "db.default.driver" ->  "org.h2.Driver",
-    "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+//    "db.default.driver" ->  "org.h2.Driver",
+//    "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
+    "db.default.driver" -> "com.mysql.jdbc.Driver",
+    "db.default.url" -> "jdbc:mysql://default:3306/graph_dev",
     "db.default.password" -> "graph",
     "db.default.user" -> "graph",
     "cache.max.size" -> java.lang.Integer.valueOf(10000),
@@ -92,7 +96,34 @@ object S2Graph {
 
   var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs)
 
+  def toTypeSafeConfig(configuration: Configuration): Config = {
+    val m = new mutable.HashMap[String, AnyRef]()
+    for {
+      key <- configuration.getKeys
+      value = configuration.getProperty(key)
+    } {
+      m.put(key, value)
+    }
+    val config  = ConfigFactory.parseMap(m).withFallback(DefaultConfig)
+    config
+  }
 
+  def fromTypeSafeConfig(config: Config): Configuration = {
+    val configuration = new BaseConfiguration()
+    for {
+      e <- config.entrySet()
+    } {
+      configuration.setProperty(e.getKey, e.getValue.unwrapped())
+    }
+    configuration
+  }
+
+  def open(configuration: Configuration): S2Graph = {
+    val numOfThread = Runtime.getRuntime.availableProcessors()
+    val threadPool = Executors.newFixedThreadPool(numOfThread)
+    val ec = ExecutionContext.fromExecutor(threadPool)
+    new S2Graph(configuration)(ec)
+  }
 
   def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): 
Storage[_, _] = {
     val storageBackend = config.getString("s2graph.storage.backend")
@@ -497,13 +528,62 @@ object S2Graph {
     }
     results
   }
-
 }
 
[email protected](Graph.OptIn.SUITE_STRUCTURE_STANDARD)
[email protected](value = Array(
+// passed
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.FeatureSupportTest", 
method="*", reason="no"), // pass
+  new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.PropertyTest", 
method="*", reason="no"), // pass
+
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexPropertyTest", 
method="*", reason="no"), // pass
+  new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VertexTest", 
method="*", reason="no"), // pss
+  new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.EdgeTest", 
method="*", reason="no"), // pass
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphConstructionTest",
 method="*", reason="no"), // pass
+
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest",
 method="*", reason="no"), // pass
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexTest",
 method="*", reason="no"), // pass one error
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedGraphTest",
 method="*", reason="no"), // pass all ignored
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest",
 method="*", reason="no"), // pass
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexPropertyTest",
 method="*", reason="no"), // pass
+
+//  new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.GraphTest", 
method="*", reason="no"), // pass
+
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest",
 method="*", reason="no"), // pass
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceGraphTest",
 method="*", reason="no"), // pass
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexPropertyTest",
 method="*", reason="no"), // pass
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceVertexTest",
 method="*", reason="no"), // pass
+
+  // not yet supported
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.SerializationTest", 
method="*", reason="no"),
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.TransactionTest", 
method="*", reason="no"),
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.VariablesTest", 
method="*", reason="no"),
+
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoCustomTest", 
method="*", reason="no"),
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", 
method="*", reason="no"),
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", 
method="*", reason="no"),
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoPropertyTest", 
method="*", reason="no"),
+  new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoTest", 
method="*", reason="no"),
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", 
method="*", reason="no"),
+
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.util.star.StarGraphTest",
 method="*", reason="no"),
+
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest",
 method="*", reason="no"),
+  new 
Graph.OptOut(test="org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest",
 method="*", reason="no")
+))
 class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends 
Graph {
 
   import S2Graph._
 
+  private var apacheConfiguration: Configuration = _
+
+  def this(apacheConfiguration: Configuration)(ec: ExecutionContext) = {
+    this(S2Graph.toTypeSafeConfig(apacheConfiguration))(ec)
+    this.apacheConfiguration = apacheConfiguration
+  }
+
+  private val running = new AtomicBoolean(true)
+
   val config = _config.withFallback(S2Graph.DefaultConfig)
 
   Model.apply(config)
@@ -522,6 +602,9 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
   val WaitTimeout = Duration(60, TimeUnit.SECONDS)
   val scheduledEx = 
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
 
+  val management = new Management(this)
+  def getManagement() = management
+
   private def confWithFallback(conf: Config): Config = {
     conf.withFallback(config)
   }
@@ -572,6 +655,32 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
     (k, v) = (entry.getKey, entry.getValue)
   } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}")
 
+  /* TODO */
+  val DefaultService = management.createService("_s2graph", "localhost", 
"s2graph", 0, None).get
+  val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, 
"vertex", Some("string"), HBaseType.DEFAULT_VERSION, useCache = false)
+  val DefaultColumnMetas = {
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "name", "string", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "age", "integer", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "lang", "string", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "oid", "integer", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "communityIndex", "integer", 
useCache = false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "testing", "string", 
useCache = false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "string", "string", useCache 
= false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "boolean", "boolean", 
useCache = false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "long", "long", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "float", "float", useCache = 
false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "double", "double", useCache 
= false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "integer", "integer", 
useCache = false)
+    ColumnMeta.findOrInsert(DefaultColumn.id.get, "aKey", "string", useCache = 
false)
+  }
+
+  val DefaultLabel = management.createLabel("_s2graph", 
DefaultService.serviceName, DefaultColumn.columnName, DefaultColumn.columnType,
+    DefaultService.serviceName, DefaultColumn.columnName, 
DefaultColumn.columnType, true, DefaultService.serviceName, Nil, Nil, "weak", 
None, None,
+    options = Option("""{"skipReverse": true}""")
+  )
+
   def getStorage(service: Service): Storage[_, _] = {
     storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
   }
@@ -1052,10 +1161,14 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
     storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = 
true)
   }
 
-  def shutdown(): Unit = {
-    flushStorage()
-    Model.shutdown()
-  }
+  def isRunning(): Boolean = running.get()
+
+  def shutdown(modelDataDelete: Boolean = false): Unit =
+    if (running.compareAndSet(true, false)) {
+      flushStorage()
+      Model.shutdown(modelDataDelete)
+      defaultStorage.shutdown()
+    }
 
   def toGraphElement(s: String, labelMapping: Map[String, String] = 
Map.empty): Option[GraphElement] = Try {
     val parts = GraphUtil.split(s)
@@ -1144,11 +1257,15 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
                ts: Long = System.currentTimeMillis(),
                operation: String = "insert"): S2Vertex = {
 
-    val service = Service.findByName(serviceName).getOrElse(throw new 
RuntimeException(s"$serviceName is not found."))
-    val column = ServiceColumn.find(service.id.get, 
columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+    val service = Service.findByName(serviceName).getOrElse(throw new 
java.lang.IllegalArgumentException(s"$serviceName is not found."))
+    val column = ServiceColumn.find(service.id.get, 
columnName).getOrElse(throw new 
java.lang.IllegalArgumentException(s"$columnName is not found."))
     val op = GraphUtil.toOp(operation).getOrElse(throw new 
RuntimeException(s"$operation is not supported."))
 
-    val srcVertexId = VertexId(column, toInnerVal(id, column.columnType, 
column.schemaVersion))
+    val srcVertexId = id match {
+      case vid: VertexId => id.asInstanceOf[VertexId]
+      case _ => VertexId(column, toInnerVal(id, column.columnType, 
column.schemaVersion))
+    }
+
     val propsInner = column.propsToInnerVals(props) ++
       Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
 
@@ -1196,8 +1313,21 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
               statusCode: Byte = 0,
               lockTs: Option[Long] = None,
               tsInnerValOpt: Option[InnerValLike] = None): S2Edge = {
-    val edge = new S2Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, 
version, S2Edge.EmptyProps,
-      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, 
tsInnerValOpt)
+    val edge = S2Edge(
+      this,
+      srcVertex,
+      tgtVertex,
+      innerLabel,
+      dir,
+      op,
+      version,
+      S2Edge.EmptyProps,
+      parentEdges,
+      originalEdgeOpt,
+      pendingEdgeOpt,
+      statusCode,
+      lockTs,
+      tsInnerValOpt)
     S2Edge.fillPropsWithTs(edge, propsWithTs)
     edge
   }
@@ -1356,9 +1486,26 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
       //TODO: default storage need to be fixed.
       Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
     } else {
-      val vertices = for {
-        vertexId <- vertexIds if vertexId.isInstanceOf[VertexId]
-      } yield newVertex(vertexId.asInstanceOf[VertexId])
+      val (vIds, stringIds) = vertexIds.partition(_.isInstanceOf[VertexId])
+      val verticesFromIds = vIds.map(vertexId => 
newVertex(vertexId.asInstanceOf[VertexId]))
+      val verticesFromString = stringIds.flatMap { vId =>
+        if (vId.toString.contains(S2Vertex.VertexLabelDelimiter)) {
+          val Array(serviceName, columnName, id) =
+            if (vId.toString.take(2).mkString("") == "v[") 
vId.toString.drop(2).init.split(S2Vertex.VertexLabelDelimiter)
+            else {
+              if (vId.toString.contains(S2Vertex.VertexLabelDelimiter)) {
+                vId.toString.split(S2Vertex.VertexLabelDelimiter)
+              } else {
+                Array(DefaultService.serviceName, DefaultColumn.columnName, 
vId.toString)
+              }
+            }
+
+          Seq(toVertex(serviceName, columnName, id))
+        } else {
+          Nil
+        }
+      }
+      val vertices = verticesFromIds ++ verticesFromString
 
       if (fetchVertices) {
         val future = getVertices(vertices).map { vs =>
@@ -1375,7 +1522,9 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
 
   override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
     if (edgeIds.isEmpty) {
-      Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
+      // FIXME
+      val edges = Await.result(defaultStorage.fetchEdgesAll(), 
WaitTimeout).iterator
+      edges.filterNot(_.isDegree).filterNot(_.direction == "in")
     } else {
       Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
     }
@@ -1395,24 +1544,31 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
       ls.iterator()
     }
   }
-  override def tx(): Transaction = ???
-
-
+  override def tx(): Transaction = {
+    if (!features.graph.supportsTransactions) throw 
Graph.Exceptions.transactionsNotSupported
+    ???
+  }
 
   override def variables(): Variables = ???
 
-  override def configuration(): Configuration = ???
+  override def configuration(): Configuration = apacheConfiguration
 
   override def addVertex(kvs: AnyRef*): structure.Vertex = {
-    val kvsMap = ElementHelper.asMap(kvs: _*).asScala.toMap
-    val id = kvsMap.getOrElse(T.id.toString, throw new RuntimeException("T.id 
is required."))
-    val serviceColumnNames = kvsMap.getOrElse(T.label.toString, throw new 
RuntimeException("ServiceName::ColumnName is required.")).toString
+    if (!features().vertex().supportsUserSuppliedIds() && kvs.contains(T.id)) {
+      throw Vertex.Exceptions.userSuppliedIdsNotSupported
+    }
+    val kvsMap = S2Property.kvsToProps(kvs)
+    val id = kvsMap.getOrElse(T.id.toString, Random.nextLong)
+
+    val serviceColumnNames = kvsMap.getOrElse(T.label.toString, 
DefaultColumn.columnName).toString
+
     val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
-    if (names.length != 2) throw new RuntimeException("malformed data on 
vertex label.")
-    val serviceName = names(0)
-    val columnName = names(1)
+    val (serviceName, columnName) =
+      if (names.length == 1) (DefaultService.serviceName, names(0))
+      else throw new RuntimeException("malformed data on vertex label.")
 
     val vertex = toVertex(serviceName, columnName, id, kvsMap)
+
     val future = mutateVertices(Seq(vertex), withWait = true).map { vs =>
       if (vs.forall(identity)) vertex
       else throw new RuntimeException("addVertex failed.")
@@ -1433,11 +1589,46 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
     Await.result(future, WaitTimeout)
   }
 
+  def addVertex(vertex: S2Vertex): S2Vertex = {
+    val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
+      if (rets.forall(identity)) vertex
+      else throw new RuntimeException("addVertex failed.")
+    }
+    Await.result(future, WaitTimeout)
+  }
+
   override def close(): Unit = {
     shutdown()
   }
 
   override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
 
-  override def compute(): GraphComputer = ???
+  override def compute(): GraphComputer = {
+    if (!features.graph.supportsComputer) {
+      throw Graph.Exceptions.graphComputerNotSupported
+    }
+    ???
+  }
+
+  class S2GraphFeatures extends Features {
+    import org.apache.s2graph.core.{features => FS}
+    override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures
+
+    override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures
+
+    override def supports(featureClass: Class[_ <: Features.FeatureSet], 
feature: String): Boolean =
+      super.supports(featureClass, feature)
+
+    override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures
+
+    override def toString: String = {
+      
s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}"
+    }
+  }
+
+  private val s2Features = new S2GraphFeatures
+
+  override def features() = s2Features
+
+  override def toString(): String = "[s2graph]"
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 6a47e46..b5fc110 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -21,11 +21,51 @@ package org.apache.s2graph.core
 
 
 import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.types.{InnerValLikeWithTs, CanInnerValLike}
-import org.apache.tinkerpop.gremlin.structure.{Property}
+import org.apache.s2graph.core.types.{CanInnerValLike, InnerValLikeWithTs}
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper
+import org.apache.tinkerpop.gremlin.structure._
 
 import scala.util.hashing.MurmurHash3
 
+object S2Property {
+  def kvsToProps(kvs: Seq[AnyRef]): Map[String, AnyRef] = {
+    import scala.collection.JavaConverters._
+
+    ElementHelper.legalPropertyKeyValueArray(kvs: _*)
+    val keySet = collection.mutable.Set[Any]()
+    val kvsList = ElementHelper.asPairs(kvs: _*).asScala
+    var result = Map[String, AnyRef]()
+    kvsList.foreach { pair =>
+      val key = pair.getValue0
+      val value = pair.getValue1
+      ElementHelper.validateProperty(key, value)
+      if (keySet.contains(key)) throw 
VertexProperty.Exceptions.multiPropertiesNotSupported
+
+      assertValidProp(key, value)
+
+      keySet.add(key)
+      result = result + (key -> value)
+    }
+
+    result
+  }
+
+  def assertValidProp[A](key: Any, value: A): Unit = {
+    if (key == null) throw Property.Exceptions.propertyKeyCanNotBeEmpty()
+    if (!key.isInstanceOf[String]) throw 
Element.Exceptions.providedKeyValuesMustHaveALegalKeyOnEvenIndices()
+
+    if (value == null) throw Property.Exceptions.propertyValueCanNotBeNull()
+    if (value.isInstanceOf[Iterable[_]]) throw new 
java.lang.IllegalArgumentException("not supported data type")
+    if (value.isInstanceOf[Array[_]]) throw new 
java.lang.IllegalArgumentException("not supported data type")
+    if (value.isInstanceOf[java.util.List[_]]) throw new 
java.lang.IllegalArgumentException("not supported data type")
+    if (value.isInstanceOf[java.util.Map[_, _]]) throw new 
java.lang.IllegalArgumentException("not supported data type")
+
+    if (key.toString.isEmpty) throw 
Property.Exceptions.propertyKeyCanNotBeEmpty()
+    if (Graph.Hidden.isHidden(key.toString)) throw 
Property.Exceptions.propertyKeyCanNotBeAHiddenKey(Graph.Hidden.hide(key.toString))
+
+  }
+}
 
 case class S2Property[V](element: S2Edge,
                          labelMeta: LabelMeta,
@@ -47,24 +87,26 @@ case class S2Property[V](element: S2Edge,
     innerValWithTs.bytes
   }
 
-  override def isPresent: Boolean = ???
+  @volatile var isRemoved = false
+
+  override def isPresent: Boolean = !isRemoved
 
-  override def remove(): Unit = ???
+  override def remove(): Unit = isRemoved = true
 
   override def hashCode(): Int = {
     MurmurHash3.stringHash(labelMeta.labelId + "," + labelMeta.id.get + "," + 
key + "," + value + "," + ts)
   }
 
   override def equals(other: Any): Boolean = other match {
-    case p: S2Property[_] =>
-      labelMeta.labelId == p.labelMeta.labelId &&
-      labelMeta.seq == p.labelMeta.seq &&
-      key == p.key && value == p.value && ts == p.ts
+    case p: Property[_] =>
+      key == p.key() && v == p.value()
     case _ => false
   }
 
   override def toString(): String = {
-    Map("labelMeta" -> labelMeta.toString, "key" -> key, "value" -> value, 
"ts" -> ts).toString
+//    Map("labelMeta" -> labelMeta.toString, "key" -> key, "value" -> value, 
"ts" -> ts).toString
+    // vp[name->marko]
+    s"p[${key}->${value}]"
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index 7fd2ac4..afee5d9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -20,15 +20,18 @@
 package org.apache.s2graph.core
 
 import java.util
-import java.util.function.{Consumer, BiConsumer}
+import java.util.function.{BiConsumer, Consumer}
 
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta, Service, 
ServiceColumn}
+import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types._
+import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
+import org.apache.tinkerpop.gremlin.structure.Graph.Features.{ElementFeatures, 
VertexFeatures}
 import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper
-import org.apache.tinkerpop.gremlin.structure.{Direction, Vertex, Edge, 
VertexProperty}
+import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Property, T, 
Vertex, VertexProperty}
 import play.api.libs.json.Json
+
 import scala.collection.JavaConverters._
 
 case class S2Vertex(graph: S2Graph,
@@ -86,7 +89,7 @@ case class S2Vertex(graph: S2Graph,
 
   override def equals(obj: Any) = {
     obj match {
-      case otherVertex: S2Vertex =>
+      case otherVertex: Vertex =>
         val ret = id == otherVertex.id
         //        logger.debug(s"Vertex.equals: $this, $obj => $ret")
         ret
@@ -95,7 +98,9 @@ case class S2Vertex(graph: S2Graph,
   }
 
   override def toString(): String = {
-    Map("id" -> id.toString(), "ts" -> ts, "props" -> "", "op" -> op, 
"belongLabelIds" -> belongLabelIds).toString()
+    // V + L_BRACKET + vertex.id() + R_BRACKET;
+    // v[VertexId(1, 1481694411514)]
+    s"v[${id}]"
   }
 
   def toLogString(): String = {
@@ -132,50 +137,111 @@ case class S2Vertex(graph: S2Graph,
   }
 
   override def edges(direction: Direction, labelNames: String*): 
util.Iterator[Edge] = {
-    graph.fetchEdges(this, labelNames, direction.name())
+    val labelNameList = {
+      if (labelNames.isEmpty) {
+        val labelList =
+          // TODO: Let's clarify direction
+          if (direction == Direction.IN) Label.findBySrcColumnId(id.colId)
+          else Label.findBySrcColumnId(id.colId)
+        labelList.map(_.label)
+      } else {
+        labelNames
+      }
+    }
+    graph.fetchEdges(this, labelNameList, direction.name())
+  }
+
+  // do no save to storage
+  def propertyInner[V](cardinality: Cardinality, key: String, value: V, 
objects: AnyRef*): VertexProperty[V] = {
+    S2Property.assertValidProp(key, value)
+
+    cardinality match {
+      case Cardinality.single =>
+        val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new 
RuntimeException(s"$key is not configured on Vertex."))
+        val newProps = new S2VertexProperty[V](this, columnMeta, key, value)
+        props.put(key, newProps)
+        newProps
+      case _ => throw new RuntimeException("only single cardinality is 
supported.")
+    }
   }
 
   override def property[V](cardinality: Cardinality, key: String, value: V, 
objects: AnyRef*): VertexProperty[V] = {
+    S2Property.assertValidProp(key, value)
+
     cardinality match {
       case Cardinality.single =>
         val columnMeta = serviceColumn.metasInvMap.getOrElse(key, throw new 
RuntimeException(s"$key is not configured on Vertex."))
         val newProps = new S2VertexProperty[V](this, columnMeta, key, value)
         props.put(key, newProps)
+
+        // FIXME: save to persistent for tp test
+        graph.addVertex(this)
         newProps
       case _ => throw new RuntimeException("only single cardinality is 
supported.")
     }
   }
 
-  override def addEdge(label: String, vertex: Vertex, kvs: AnyRef*): S2Edge = {
+  override def addEdge(label: String, vertex: Vertex, kvs: AnyRef*): Edge = {
     vertex match {
       case otherV: S2Vertex =>
-        val props = ElementHelper.asMap(kvs: _*).asScala.toMap
+        if (!graph.features().edge().supportsUserSuppliedIds() && 
kvs.contains(T.id)) {
+          throw Exceptions.userSuppliedIdsNotSupported()
+        }
+
+        val props = S2Property.kvsToProps(kvs)
+
         //TODO: direction, operation, _timestamp need to be reserved property 
key.
         val direction = props.get("direction").getOrElse("out").toString
         val ts = 
props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
         val operation = 
props.get("operation").map(_.toString).getOrElse("insert")
 
-        graph.addEdgeInner(this, otherV, label, direction, props, ts, 
operation)
+        try {
+          graph.addEdgeInner(this, otherV, label, direction, props, ts, 
operation)
+        } catch {
+          case e: LabelNotExistException => throw new 
java.lang.IllegalArgumentException
+         }
+      case null => throw new java.lang.IllegalArgumentException
       case _ => throw new RuntimeException("only S2Graph vertex can be used.")
     }
   }
 
   override def property[V](key: String): VertexProperty[V] = {
-    props.get(key).asInstanceOf[S2VertexProperty[V]]
+    if (props.containsKey(key)) {
+      props.get(key).asInstanceOf[S2VertexProperty[V]]
+    } else {
+      VertexProperty.empty()
+    }
   }
 
   override def properties[V](keys: String*): util.Iterator[VertexProperty[V]] 
= {
-    val ls = for {
-      key <- keys
-    } yield {
-      property[V](key)
+    val ls = new util.ArrayList[VertexProperty[V]]()
+    if (keys.isEmpty) {
+      props.keySet().forEach(new Consumer[String] {
+        override def accept(key: String): Unit = {
+          if (!ColumnMeta.reservedMetaNamesSet(key)) ls.add(property[V](key))
+        }
+      })
+    } else {
+      keys.foreach { key => ls.add(property[V](key)) }
     }
-    ls.iterator.asJava
+    ls.iterator
   }
 
-  override def remove(): Unit = ???
+  override def label(): String = {
+    serviceColumn.columnName
+//    if (serviceColumn.columnName == Vertex.DEFAULT_LABEL) 
Vertex.DEFAULT_LABEL // TP3 default vertex label name
+//    else {
+//      service.serviceName + S2Vertex.VertexLabelDelimiter + 
serviceColumn.columnName
+//    }
+  }
 
-  override def label(): String = service.serviceName + 
S2Vertex.VertexLabelDelimiter + serviceColumn.columnName
+  override def remove(): Unit = {
+    if (graph.features().vertex().supportsRemoveVertices()) {
+      // remove edge
+    } else {
+      throw Vertex.Exceptions.vertexRemovalNotSupported()
+    }
+  }
 }
 
 object S2Vertex {
@@ -196,13 +262,14 @@ object S2Vertex {
   def fillPropsWithTs(vertex: S2Vertex, props: Props): Unit = {
     props.forEach(new BiConsumer[String, S2VertexProperty[_]] {
       override def accept(key: String, p: S2VertexProperty[_]): Unit = {
-        vertex.property(Cardinality.single, key, p.value)
+//        vertex.property(Cardinality.single, key, p.value)
+        vertex.propertyInner(Cardinality.single, key, p.value)
       }
     })
   }
 
   def fillPropsWithTs(vertex: S2Vertex, state: State): Unit = {
-    state.foreach { case (k, v) => vertex.property(Cardinality.single, k.name, 
v.value) }
+    state.foreach { case (k, v) => vertex.propertyInner(Cardinality.single, 
k.name, v.value) }
   }
 
   def propsToState(props: Props): State = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
index 9f8c682..c5258fb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
@@ -27,6 +27,8 @@ import org.apache.tinkerpop.gremlin.structure.{Property, 
VertexProperty, Vertex
 
 import scala.util.hashing.MurmurHash3
 
+case class S2VertexPropertyId[V](columnMeta: ColumnMeta, value: V)
+
 case class S2VertexProperty[V](element: S2Vertex,
                                columnMeta: ColumnMeta,
                                key: String,
@@ -42,27 +44,32 @@ case class S2VertexProperty[V](element: S2Vertex,
 
   override def properties[U](strings: String*): util.Iterator[Property[U]] = 
???
 
-  override def property[V](key: String, value: V): Property[V] = ???
+  override def property[A](key: String, value: A): Property[A] = ???
+
+  override def remove(): Unit = {
+    if (!element.graph.features.vertex.properties.supportsRemoveProperty) {
+      throw Property.Exceptions.propertyRemovalNotSupported
+    }
+    isRemoved = true
+  }
 
-  override def remove(): Unit = ???
+  override def id(): AnyRef = S2VertexPropertyId(columnMeta, v)
 
-  override def id(): AnyRef = ???
+  @volatile var isRemoved = false
 
-  override def isPresent: Boolean = ???
+  override def isPresent: Boolean = !isRemoved
 
   override def hashCode(): Int = {
-    MurmurHash3.stringHash(columnMeta.columnId + "," + columnMeta.id.get + "," 
+ key + "," + value)
+    (element, id()).hashCode()
   }
 
   override def equals(other: Any): Boolean = other match {
-    case p: S2VertexProperty[_] =>
-      columnMeta.columnId == p.columnMeta.columnId &&
-        columnMeta.seq == p.columnMeta.seq &&
-        key == p.key && value == p.value
+    case p: VertexProperty[_] => element == p.element && id() == p.id()
     case _ => false
   }
 
   override def toString(): String = {
-    Map("columnMeta" -> columnMeta.toString, "key" -> key, "value" -> 
value).toString
+//    Map("columnMeta" -> columnMeta.toString, "key" -> key, "value" -> 
value).toString
+    s"vp[${key}->${value}]"
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2DataTypeFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2DataTypeFeatures.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2DataTypeFeatures.scala
new file mode 100644
index 0000000..a79da46
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2DataTypeFeatures.scala
@@ -0,0 +1,43 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+case class S2DataTypeFeatures() extends Features.DataTypeFeatures {
+
+  override def supportsStringValues(): Boolean = true
+
+  override def supportsFloatValues(): Boolean = true
+
+  override def supportsDoubleValues(): Boolean = true
+
+  override def supportsIntegerValues(): Boolean = true
+
+  override def supportsLongValues(): Boolean = true
+
+  override def supportsBooleanValues(): Boolean = true
+
+  override def supportsDoubleArrayValues(): Boolean = false
+
+  override def supportsStringArrayValues(): Boolean = false
+
+  override def supportsIntegerArrayValues(): Boolean = false
+
+  override def supportsByteValues(): Boolean = false
+
+  override def supportsUniformListValues(): Boolean = false
+
+  override def supportsMapValues(): Boolean = false
+
+  override def supportsBooleanArrayValues(): Boolean = false
+
+  override def supportsSerializableValues(): Boolean = true
+
+  override def supportsLongArrayValues(): Boolean = false
+
+  override def supportsMixedListValues(): Boolean = false
+
+  override def supportsFloatArrayValues(): Boolean = false
+
+  override def supportsByteArrayValues(): Boolean = false
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgeFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgeFeatures.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgeFeatures.scala
new file mode 100644
index 0000000..825b333
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgeFeatures.scala
@@ -0,0 +1,11 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+class S2EdgeFeatures extends S2ElementFeatures with Features.EdgeFeatures {
+  override def supportsRemoveEdges(): Boolean = true
+
+  override def supportsAddEdges(): Boolean = true
+
+  override def properties(): Features.EdgePropertyFeatures = new 
S2EdgePropertyFeatures
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgePropertyFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgePropertyFeatures.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgePropertyFeatures.scala
new file mode 100644
index 0000000..556bbdc
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2EdgePropertyFeatures.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+class S2EdgePropertyFeatures extends S2PropertyFeatures with 
Features.EdgePropertyFeatures {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2ElementFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2ElementFeatures.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2ElementFeatures.scala
new file mode 100644
index 0000000..bbb6a79
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2ElementFeatures.scala
@@ -0,0 +1,23 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+abstract class S2ElementFeatures extends Features.ElementFeatures {
+  override def supportsStringIds(): Boolean = true
+
+  override def supportsCustomIds(): Boolean = false
+
+  override def supportsUuidIds(): Boolean = false
+
+  override def supportsAddProperty(): Boolean = true
+
+  override def supportsRemoveProperty(): Boolean = true
+
+  override def supportsUserSuppliedIds(): Boolean = true
+
+  override def supportsAnyIds(): Boolean = false
+
+  override def supportsNumericIds(): Boolean = false
+
+//  override def willAllowId(id: scala.Any): Boolean = super.willAllowId(id)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2GraphFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2GraphFeatures.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2GraphFeatures.scala
new file mode 100644
index 0000000..e9aa247
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2GraphFeatures.scala
@@ -0,0 +1,19 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+import org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures
+
+
+class S2GraphFeatures extends GraphFeatures {
+  override def supportsComputer(): Boolean = false
+
+  override def supportsThreadedTransactions(): Boolean = false
+
+  override def supportsTransactions(): Boolean = false
+
+  override def supportsPersistence(): Boolean = true
+
+  override def variables(): Features.VariableFeatures = super.variables()
+
+  override def supportsConcurrentAccess(): Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2PropertyFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2PropertyFeatures.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2PropertyFeatures.scala
new file mode 100644
index 0000000..cf3316f
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2PropertyFeatures.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+class S2PropertyFeatures extends S2DataTypeFeatures with 
Features.PropertyFeatures {
+  override def supportsProperties(): Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2VariableFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2VariableFeatures.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2VariableFeatures.scala
new file mode 100644
index 0000000..6cbf129
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2VariableFeatures.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+class S2VariableFeatures extends S2DataTypeFeatures with 
Features.VariableFeatures {
+  override def supportsVariables(): Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2Variables.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2Variables.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Variables.scala
new file mode 100644
index 0000000..8a9c42b
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Variables.scala
@@ -0,0 +1,6 @@
+package org.apache.s2graph.core.features
+
+
+class S2Variables {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexFeatures.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexFeatures.scala
new file mode 100644
index 0000000..14024fd
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexFeatures.scala
@@ -0,0 +1,18 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
+
+class S2VertexFeatures extends S2ElementFeatures with Features.VertexFeatures {
+  override def supportsAddVertices(): Boolean = true
+
+  override def supportsRemoveVertices(): Boolean = true
+
+  override def getCardinality(key: String): Cardinality = Cardinality.single
+
+  override def supportsMultiProperties(): Boolean = false
+
+  override def supportsMetaProperties(): Boolean = false
+
+  override def properties(): Features.VertexPropertyFeatures = new 
S2VertexPropertyFeatures()
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexPropertyFeatures.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexPropertyFeatures.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexPropertyFeatures.scala
new file mode 100644
index 0000000..592cc0b
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/features/S2VertexPropertyFeatures.scala
@@ -0,0 +1,24 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+class S2VertexPropertyFeatures extends S2PropertyFeatures with 
Features.VertexPropertyFeatures {
+
+  override def supportsStringIds(): Boolean = true
+
+  override def supportsUserSuppliedIds(): Boolean = true
+
+  override def supportsAddProperty(): Boolean = true
+
+  override def willAllowId(id: scala.Any): Boolean = true
+
+  override def supportsNumericIds(): Boolean = false
+
+  override def supportsRemoveProperty(): Boolean = true
+
+  override def supportsUuidIds(): Boolean = false
+
+  override def supportsCustomIds(): Boolean = false
+
+  override def supportsAnyIds(): Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
index 09d02d1..a92c93b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
@@ -24,13 +24,14 @@ import scalikejdbc._
 
 object ColumnMeta extends Model[ColumnMeta] {
 
-  val timeStampSeq = 0.toByte
-  val countSeq = -1.toByte
+  val timeStampSeq = -1.toByte
   val lastModifiedAtColumnSeq = 0.toByte
   val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", 
lastModifiedAtColumnSeq, "long")
   val maxValue = Byte.MaxValue
 
-  val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq, "long")
+  val timestamp = ColumnMeta(None, -1, "_timestamp", timeStampSeq.toByte, 
"long")
+  val reservedMetas = Seq(timestamp, lastModifiedAtColumn)
+  val reservedMetaNamesSet = reservedMetas.map(_.name).toSet
 
   def apply(rs: WrappedResultSet): ColumnMeta = {
     ColumnMeta(Some(rs.int("id")), rs.int("column_id"), rs.string("name"), 
rs.byte("seq"), rs.string("data_type").toLowerCase())
@@ -56,11 +57,16 @@ object ColumnMeta extends Model[ColumnMeta] {
     }
   }
 
-  def findByName(columnId: Int, name: String)(implicit session: DBSession = 
AutoSession) = {
+  def findByName(columnId: Int, name: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession) = {
     //    val cacheKey = s"columnId=$columnId:name=$name"
     val cacheKey = "columnId=" + columnId + ":name=" + name
-    withCache(cacheKey)( sql"""select * from column_metas where column_id = 
${columnId} and name = ${name}"""
-      .map { rs => ColumnMeta(rs) }.single.apply())
+    if (useCache) {
+      withCache(cacheKey)( sql"""select * from column_metas where column_id = 
${columnId} and name = ${name}"""
+          .map { rs => ColumnMeta(rs) }.single.apply())
+    } else {
+      sql"""select * from column_metas where column_id = ${columnId} and name 
= ${name}"""
+          .map { rs => ColumnMeta(rs) }.single.apply()
+    }
   }
 
   def insert(columnId: Int, name: String, dataType: String)(implicit session: 
DBSession = AutoSession) = {
@@ -73,8 +79,8 @@ object ColumnMeta extends Model[ColumnMeta] {
     }
   }
 
-  def findOrInsert(columnId: Int, name: String, dataType: String)(implicit 
session: DBSession = AutoSession): ColumnMeta = {
-    findByName(columnId, name) match {
+  def findOrInsert(columnId: Int, name: String, dataType: String, useCache: 
Boolean = true)(implicit session: DBSession = AutoSession): ColumnMeta = {
+    findByName(columnId, name, useCache) match {
       case Some(c) => c
       case None =>
         insert(columnId, name, dataType)
@@ -97,7 +103,7 @@ object ColumnMeta extends Model[ColumnMeta] {
     val columnMeta = findById(id)
     val (columnId, name) = (columnMeta.columnId, columnMeta.name)
     sql"""delete from column_metas where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", 
s"colunmId=$columnId")
+    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", 
s"columnId=$columnId")
     cacheKeys.foreach { key =>
       expireCache(key)
       expireCaches(key)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
index 4a7e931..a16334a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
@@ -73,6 +73,7 @@ object LabelMeta extends Model[LabelMeta] {
   // Each reserved column(_timestamp, timestamp) has same seq number, starts 
with '_' has high priority
   val reservedMetas = List(empty, label, direction, lastDeletedAt, from, 
fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = 
lm.name.drop(1))) }.reverse
   val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, 
fromHash, to, degree, timestamp, count)
+  val reservedMetaNamesSet = reservedMetasInner.map(_.name).toSet
 
   val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", 
"_from_hash", "label", "direction", "timestamp", "_timestamp")
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
index 7a18a49..e21072e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
@@ -20,17 +20,18 @@
 package org.apache.s2graph.core.mysqls
 
 import java.util.concurrent.Executors
+import java.util.concurrent.atomic.AtomicLong
 
-import com.typesafe.config.{ConfigFactory, Config}
+import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.JSONParser
 import org.apache.s2graph.core.utils.{SafeUpdateCache, logger}
-import play.api.libs.json.{Json, JsObject, JsValue}
+import play.api.libs.json.{JsObject, JsValue, Json}
 import scalikejdbc._
 
 import scala.concurrent.ExecutionContext
 import scala.io.Source
 import scala.language.{higherKinds, implicitConversions}
-import scala.util.{Success, Failure, Try}
+import scala.util.{Failure, Success, Try}
 
 object Model {
   var maxSize = 10000
@@ -40,6 +41,8 @@ object Model {
   val ec = ExecutionContext.fromExecutor(threadPool)
   val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8"
 
+  private val ModelReferenceCount = new AtomicLong(0L)
+
   def apply(config: Config) = {
     maxSize = config.getInt("cache.max.size")
     ttl = config.getInt("cache.ttl.seconds")
@@ -58,6 +61,8 @@ object Model {
       settings)
 
     checkSchema()
+
+    ModelReferenceCount.incrementAndGet()
   }
 
   def checkSchema(): Unit = {
@@ -107,9 +112,37 @@ object Model {
     }
   }
 
-  def shutdown() = {
-    ConnectionPool.closeAll()
-  }
+  def shutdown(modelDataDelete: Boolean = false) =
+    if (ModelReferenceCount.decrementAndGet() <= 0) {
+      // FIXME: When Model is served by embedded database and deleteData is 
set, Model deletes
+      // the underlying database. Its purpose is clearing runtime footprint 
when running tests.
+      if (modelDataDelete) {
+        withTx { implicit session =>
+          sql"SHOW TABLES"
+              .map(rs => rs.string(1))
+              .list
+              .apply()
+              .map { table => s"TRUNCATE TABLE $table" }
+        } match {
+          case Success(stmts) =>
+            val newStmts = List("SET FOREIGN_KEY_CHECKS = 0") ++ stmts ++ 
List("SET FOREIGN_KEY_CHECKS = 1")
+            withTx { implicit session =>
+              newStmts.foreach { stmt =>
+                session.execute(stmt)
+              }
+            } match {
+              case Success(_) =>
+                logger.info(s"Success to truncate models: $stmts")
+              case Failure(e) =>
+                throw new IllegalStateException(s"Failed to truncate models", 
e)
+            }
+          case Failure(e) =>
+            throw new IllegalStateException(s"Failed to list models", e)
+        }
+      }
+      clearCache()
+      ConnectionPool.closeAll()
+    }
 
   def loadCache() = {
     Service.findAll()
@@ -120,6 +153,15 @@ object Model {
     ColumnMeta.findAll()
   }
 
+  def clearCache() = {
+    Service.expireAll()
+    ServiceColumn.expireAll()
+    Label.expireAll()
+    LabelMeta.expireAll()
+    LabelIndex.expireAll()
+    ColumnMeta.expireAll()
+  }
+
   def extraOptions(options: Option[String]): Map[String, JsValue] = options 
match {
     case None => Map.empty
     case Some(v) =>
@@ -169,6 +211,11 @@ trait Model[V] extends SQLSyntaxSupport[V] {
 
   val expireCaches = listCache.invalidate _
 
+  def expireAll() = {
+    listCache.invalidateAll()
+    optionCache.invalidateAll()
+  }
+
   def putsToCache(kvs: List[(String, V)]) = kvs.foreach {
     case (key, value) => optionCache.put(key, Option(value))
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
index 7fdda45..20330c4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
@@ -73,8 +73,8 @@ object Service extends Model[Service] {
   }
 
   def findOrInsert(serviceName: String, cluster: String, hTableName: String,
-                   preSplitSize: Int, hTableTTL: Option[Int], 
compressionAlgorithm: String)(implicit session: DBSession = AutoSession): 
Service = {
-    findByName(serviceName) match {
+                   preSplitSize: Int, hTableTTL: Option[Int], 
compressionAlgorithm: String, useCache: Boolean = true)(implicit session: 
DBSession = AutoSession): Service = {
+    findByName(serviceName, useCache) match {
       case Some(s) => s
       case None =>
         insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, 
compressionAlgorithm)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index f791f22..8614132 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -42,11 +42,16 @@ object ServiceColumn extends Model[ServiceColumn] {
 //    find(service.id.get, columnName, useCache)
 //  }
 
-  def findById(id: Int)(implicit session: DBSession = AutoSession): 
ServiceColumn = {
-//    val cacheKey = s"id=$id"
+  def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession 
= AutoSession): ServiceColumn = {
     val cacheKey = "id=" + id
-    withCache(cacheKey)(sql"""select * from service_columns where id = 
${id}""".map { x => ServiceColumn(x) }.single.apply).get
+
+    if (useCache) {
+      withCache(cacheKey)(sql"""select * from service_columns where id = 
${id}""".map { x => ServiceColumn(x) }.single.apply).get
+    } else {
+      sql"""select * from service_columns where id = ${id}""".map { x => 
ServiceColumn(x) }.single.apply.get
+    }
   }
+
   def find(serviceId: Int, columnName: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
 //    val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
     val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
@@ -67,7 +72,7 @@ object ServiceColumn extends Model[ServiceColumn] {
          values(${serviceId}, ${columnName}, ${columnType}, 
${schemaVersion})""".execute.apply()
   }
   def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val serviceColumn = findById(id)
+    val serviceColumn = findById(id, useCache = false)
     val (serviceId, columnName) = (serviceColumn.serviceId, 
serviceColumn.columnName)
     sql"""delete from service_columns where id = ${id}""".execute.apply()
     val cacheKeys = List(s"id=$id", 
s"serviceId=$serviceId:columnName=$columnName")
@@ -76,8 +81,8 @@ object ServiceColumn extends Model[ServiceColumn] {
       expireCaches(key)
     }
   }
-  def findOrInsert(serviceId: Int, columnName: String, columnType: 
Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION)(implicit 
session: DBSession = AutoSession): ServiceColumn = {
-    find(serviceId, columnName) match {
+  def findOrInsert(serviceId: Int, columnName: String, columnType: 
Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: 
Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
+    find(serviceId, columnName, useCache) match {
       case Some(sc) => sc
       case None =>
         insert(serviceId, columnName, columnType, schemaVersion)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index fb75765..a9b523c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -266,9 +266,11 @@ abstract class Storage[Q, R](val graph: S2Graph,
                   replicationScopeOpt: Option[Int] = None,
                   totalRegionCount: Option[Int] = None): Unit
 
+  def truncateTable(zkAddr: String, tableNameStr: String): Unit = {}
 
+  def deleteTable(zkAddr: String, tableNameStr: String): Unit = {}
 
-
+  def shutdown(): Unit
 
   /** Public Interface */
   def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
@@ -1096,7 +1098,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
   }
 
   def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = {
-    edge.property(LabelMeta.degree.name, degreeVal, edge.ts)
+    edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts)
     val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
       indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Put, durability = indexEdge.label.durability))
     }
@@ -1112,4 +1114,6 @@ abstract class Storage[Q, R](val graph: S2Graph,
   }
 
   def info: Map[String, String] = Map("className" -> 
this.getClass.getSimpleName)
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index d4ae451..ede1933 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -32,22 +32,23 @@ import 
org.apache.hadoop.hbase.io.compress.Compression.Algorithm
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
+import org.apache.hadoop.hbase.{TableName, HColumnDescriptor, 
HBaseConfiguration, HTableDescriptor}
 import org.apache.hadoop.security.UserGroupInformation
+
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, 
ScanWithRange}
-import org.apache.s2graph.core.types.{HBaseType, VertexId}
+import org.apache.s2graph.core.types.{VertexId, HBaseType}
 import org.apache.s2graph.core.utils._
 import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
 import org.hbase.async._
-
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent._
 import scala.concurrent.duration.Duration
 import scala.util.Try
+import scala.util.control.NonFatal
 import scala.util.hashing.MurmurHash3
 
 
@@ -84,6 +85,10 @@ object AsynchbaseStorage {
     client
   }
 
+  def shutdown(client: HBaseClient): Unit = {
+    client.shutdown().join()
+  }
+
   case class ScanWithRange(scan: Scanner, offset: Int, limit: Int)
   type AsyncRPC = Either[GetRequest, ScanWithRange]
 }
@@ -466,6 +471,13 @@ class AsynchbaseStorage(override val graph: S2Graph,
   }
 
 
+  override def shutdown(): Unit = {
+    flush()
+    clients.foreach { client =>
+      AsynchbaseStorage.shutdown(client)
+    }
+  }
+
   override def createTable(_zkAddr: String,
                            tableName: String,
                            cfs: List[String],
@@ -495,6 +507,8 @@ class AsynchbaseStorage(override val graph: S2Graph,
               .setMinVersions(0)
               .setBlocksize(32768)
               .setBlockCacheEnabled(true)
+                // FIXME: For test!!
+              .setInMemory(true)
             if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
             if (replicationScopeOpt.isDefined) 
columnDesc.setScope(replicationScopeOpt.get)
             desc.addFamily(columnDesc)
@@ -516,6 +530,59 @@ class AsynchbaseStorage(override val graph: S2Graph,
     }
   }
 
+  override def truncateTable(zkAddr: String, tableNameStr: String): Unit = {
+    val tableName = TableName.valueOf(tableNameStr)
+    val adminTry = Try(getAdmin(zkAddr))
+    if (adminTry.isFailure) return
+    val admin = adminTry.get
+
+    if (!Try(admin.tableExists(tableName)).getOrElse(false)) {
+      logger.info(s"No table to truncate ${tableNameStr}")
+      return
+    }
+
+    Try(admin.isTableDisabled(tableName)).map {
+      case true =>
+        logger.info(s"${tableNameStr} is already disabled.")
+
+      case false =>
+        logger.info(s"Before disabling to trucate ${tableNameStr}")
+        Try(admin.disableTable(tableName)).recover {
+          case NonFatal(e) =>
+            logger.info(s"Failed to disable ${tableNameStr}: ${e}")
+        }
+        logger.info(s"After disabling to trucate ${tableNameStr}")
+    }
+
+    logger.info(s"Before truncating ${tableNameStr}")
+    Try(admin.truncateTable(tableName, true)).recover {
+      case NonFatal(e) =>
+        logger.info(s"Failed to truncate ${tableNameStr}: ${e}")
+    }
+    logger.info(s"After truncating ${tableNameStr}")
+    Try(admin.close()).recover {
+      case NonFatal(e) =>
+        logger.info(s"Failed to close admin ${tableNameStr}: ${e}")
+    }
+    Try(admin.getConnection.close()).recover {
+      case NonFatal(e) =>
+        logger.info(s"Failed to close connection ${tableNameStr}: ${e}")
+    }
+
+  }
+
+  override def deleteTable(zkAddr: String, tableNameStr: String): Unit = {
+    val admin = getAdmin(zkAddr)
+    val tableName = TableName.valueOf(tableNameStr)
+    if (!admin.tableExists(tableName)) {
+      return
+    }
+    if (admin.isTableEnabled(tableName)) {
+      admin.disableTable(tableName)
+    }
+    admin.deleteTable(tableName)
+    admin.close()
+  }
 
   /** Asynchbase implementation override default getVertices to use future 
Cache */
   override def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
@@ -549,7 +616,9 @@ class AsynchbaseStorage(override val graph: S2Graph,
       scan.setFamily(Serializable.edgeCf)
       scan.setMaxVersions(1)
 
-      scan.nextRows(10000).toFuture(emptyKeyValuesLs).map { kvsLs =>
+      scan.nextRows(10000).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
         kvsLs.flatMap { kvs =>
           kvs.flatMap { kv =>
             indexEdgeDeserializer.fromKeyValues(Seq(kv), None)
@@ -567,10 +636,12 @@ class AsynchbaseStorage(override val graph: S2Graph,
       scan.setFamily(Serializable.vertexCf)
       scan.setMaxVersions(1)
 
-      scan.nextRows(10000).toFuture(emptyKeyValuesLs).map { kvsLs =>
-        kvsLs.flatMap { kvs =>
-          vertexDeserializer.fromKeyValues(kvs, None)
-        }
+      scan.nextRows(10000).toFuture(emptyKeyValuesLs).map {
+        case null => Seq.empty
+        case kvsLs =>
+          kvsLs.flatMap { kvs =>
+            vertexDeserializer.fromKeyValues(kvs, None)
+          }
       }
     }
     Future.sequence(futures).map(_.flatten)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d05d8a49/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 8f47f97..6095cea 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -74,8 +74,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
            val degreeVal = bytesToLongFunc(kv.value, 0)
            val tgtVertexId = VertexId(ServiceColumn.Default, 
InnerVal.withStr("0", schemaVer))
 
-           edge.property(LabelMeta.timestamp.name, version, version)
-           edge.property(LabelMeta.degree.name, degreeVal, version)
+           edge.propertyInner(LabelMeta.timestamp.name, version, version)
+           edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
            edge.tgtVertex = graph.newVertex(tgtVertexId, version)
            edge.op = GraphUtil.defaultOpByte
            edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
@@ -119,9 +119,9 @@ class IndexEdgeDeserializable(graph: S2Graph,
              if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
 
              if (k == LabelMeta.degree) {
-               edge.property(LabelMeta.degree.name, v.value, version)
+               edge.propertyInner(LabelMeta.degree.name, v.value, version)
              } else {
-               edge.property(meta.name, v.value, version)
+               edge.propertyInner(meta.name, v.value, version)
              }
            }
 
@@ -129,13 +129,13 @@ class IndexEdgeDeserializable(graph: S2Graph,
            if (op == GraphUtil.operations("incrementCount")) {
              //        val countVal = Bytes.toLong(kv.value)
              val countVal = bytesToLongFunc(kv.value, 0)
-             edge.property(LabelMeta.count.name, countVal, version)
+             edge.propertyInner(LabelMeta.count.name, countVal, version)
            } else {
              val (props, endAt) = bytesToKeyValues(kv.value, 0, 
kv.value.length, schemaVer, label)
              props.foreach { case (k, v) =>
                if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
 
-               edge.property(k.name, v.value, version)
+               edge.propertyInner(k.name, v.value, version)
              }
            }
 
@@ -146,7 +146,7 @@ class IndexEdgeDeserializable(graph: S2Graph,
                TargetVertexId(ServiceColumn.Default, vId.innerVal)
              } else tgtVertexIdRaw
 
-           edge.property(LabelMeta.timestamp.name, tsVal, version)
+           edge.propertyInner(LabelMeta.timestamp.name, tsVal, version)
            edge.tgtVertex = graph.newVertex(tgtVertexId, version)
            edge.op = op
            edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))


Reply via email to