Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 625e2217c -> 7951f5520


[S2GRAPH-146]: Merge two IndexEdgeDeserializable under tall, wide package into 
one.

RA:
    [S2GRAPH-146] https://issues.apache.org/jira/browse/S2GRAPH-146

Pull Request:
    Closes #111

Authors
  DO YUNG YOON: [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7951f552
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7951f552
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7951f552

Branch: refs/heads/master
Commit: 7951f5520be7470135393867d85bd22fb2346419
Parents: 625e221
Author: DO YUNG YOON <[email protected]>
Authored: Sun Dec 11 21:49:53 2016 +0900
Committer: daewon <[email protected]>
Committed: Tue Dec 13 10:44:31 2016 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/JSONParser.scala    |   1 +
 .../scala/org/apache/s2graph/core/S2Edge.scala  |  13 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala |  61 ++++--
 .../org/apache/s2graph/core/mysqls/Label.scala  |   6 +-
 .../s2graph/core/mysqls/ServiceColumn.scala     |  11 +-
 .../s2graph/core/storage/Deserializable.scala   |  26 +--
 .../apache/s2graph/core/storage/Storage.scala   |  45 ++--
 .../core/storage/StorageDeserializable.scala    |  21 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |  38 +++-
 .../tall/IndexEdgeDeserializable.scala          | 215 ++++++++++---------
 .../wide/IndexEdgeDeserializable.scala          | 201 +++++++++--------
 .../tall/SnapshotEdgeDeserializable.scala       | 118 +++++-----
 .../wide/SnapshotEdgeDeserializable.scala       | 105 ++++-----
 .../serde/vertex/VertexDeserializable.scala     |  64 +++---
 .../apache/s2graph/core/types/HBaseType.scala   |   3 +-
 .../s2graph/core/types/InnerValLike.scala       |  84 ++++----
 .../core/Integrate/IntegrateCommon.scala        |   2 +-
 .../apache/s2graph/core/JsonParserTest.scala    |   2 +-
 .../org/apache/s2graph/core/TestCommon.scala    |  34 +--
 .../s2graph/core/TestCommonWithModels.scala     |   2 +-
 .../s2graph/core/parsers/WhereParserTest.scala  |   4 +-
 .../core/storage/hbase/IndexEdgeTest.scala      |   3 +-
 22 files changed, 577 insertions(+), 482 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index 732840e..2effaf1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -182,6 +182,7 @@ object JSONParser {
     any match {
       case n: BigDecimal =>
         if (isNumeric) InnerVal.withNumber(n, version)
+        else if (dType == InnerVal.STRING) InnerVal.withStr(n.toString, 
version)
         else throw new IllegalDataTypeException(s"[ValueType] = BigDecimal, 
[DataType]: $dataType, [Input]: $any")
       case l: Long =>
         if (isNumeric) InnerVal.withLong(l, version)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 2960265..9cfde75 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -469,17 +469,11 @@ case class S2Edge(innerGraph: S2Graph,
   }
 
   override def hashCode(): Int = {
-    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + 
tgtVertex.innerId)
+    id().hashCode()
   }
 
   override def equals(other: Any): Boolean = other match {
-    case e: S2Edge =>
-      srcVertex.innerId == e.srcVertex.innerId &&
-        tgtVertex.innerId == e.tgtVertex.innerId &&
-        labelWithDir == e.labelWithDir && S2Edge.sameProps(propsWithTs, 
e.propsWithTs) &&
-        op == e.op && version == e.version &&
-        pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode 
== statusCode &&
-        parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt
+    case e: Edge => e.id().equals(e.id())
     case _ => false
   }
 
@@ -568,11 +562,12 @@ case class S2Edge(innerGraph: S2Graph,
 
   override def graph(): Graph = innerGraph
 
-  override def id(): AnyRef = (srcVertex.innerId, labelWithDir, 
tgtVertex.innerId)
+  override def id(): AnyRef = EdgeId(srcVertex.innerId, tgtVertex.innerId, 
label(), direction)
 
   override def label(): String = innerLabel.label
 }
 
+case class EdgeId(srcVertexId: InnerValLike, tgtVertexId: InnerValLike, 
labelName: String, direction: String)
 
 case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
                       edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index b3f3ac8..e8bda2c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -1120,8 +1120,8 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
              operation: String = "insert"): S2Edge = {
     val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
 
-    val srcVertexIdInnerVal = toInnerVal(srcId.toString, 
label.srcColumn.columnType, label.schemaVersion)
-    val tgtVertexIdInnerVal = toInnerVal(tgtId.toString, 
label.tgtColumn.columnType, label.schemaVersion)
+    val srcVertexIdInnerVal = toInnerVal(srcId, label.srcColumn.columnType, 
label.schemaVersion)
+    val tgtVertexIdInnerVal = toInnerVal(tgtId, label.tgtColumn.columnType, 
label.schemaVersion)
 
     val srcVertex = newVertex(SourceVertexId(label.srcColumn, 
srcVertexIdInnerVal), System.currentTimeMillis())
     val tgtVertex = newVertex(TargetVertexId(label.tgtColumn, 
tgtVertexIdInnerVal), System.currentTimeMillis())
@@ -1145,7 +1145,7 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
     val column = ServiceColumn.find(service.id.get, 
columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
     val op = GraphUtil.toOp(operation).getOrElse(throw new 
RuntimeException(s"$operation is not supported."))
 
-    val srcVertexId = VertexId(column, toInnerVal(id.toString, 
column.columnType, column.schemaVersion))
+    val srcVertexId = VertexId(column, toInnerVal(id, column.columnType, 
column.schemaVersion))
     val propsInner = column.propsToInnerVals(props) ++
       Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
 
@@ -1346,28 +1346,55 @@ class S2Graph(_config: Config)(implicit val ec: 
ExecutionContext) extends Graph
   override def vertices(vertexIds: AnyRef*): util.Iterator[structure.Vertex] = 
{
     val fetchVertices = vertexIds.lastOption.map { lastParam =>
       if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
-      else false
-    }.getOrElse(false)
+      else true
+    }.getOrElse(true)
 
-    val vertices = for {
-      vertexId <- vertexIds if vertexId.isInstanceOf[VertexId]
-    } yield newVertex(vertexId.asInstanceOf[VertexId])
-
-    if (fetchVertices) {
-      val future = getVertices(vertices).map { vs =>
-        val ls = new util.ArrayList[structure.Vertex]()
-        ls.addAll(vs)
-        ls.iterator()
+    if (vertexIds.isEmpty) {
+      //TODO: default storage need to be fixed.
+      Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
+    } else {
+      val vertices = for {
+        vertexId <- vertexIds if vertexId.isInstanceOf[VertexId]
+      } yield newVertex(vertexId.asInstanceOf[VertexId])
+
+      if (fetchVertices) {
+        val future = getVertices(vertices).map { vs =>
+          val ls = new util.ArrayList[structure.Vertex]()
+          ls.addAll(vs)
+          ls.iterator()
+        }
+        Await.result(future, WaitTimeout)
+      } else {
+        vertices.iterator
       }
-      Await.result(future, WaitTimeout)
+    }
+  }
+
+  override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
+    if (edgeIds.isEmpty) {
+      Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
     } else {
-      vertices.iterator
+      Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
     }
   }
 
+  def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
+    val s2EdgeIds = 
edgeIds.filter(_.isInstanceOf[EdgeId]).map(_.asInstanceOf[EdgeId])
+    val edgesToFetch = for {
+      id <- s2EdgeIds
+    } yield {
+        toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
+      }
+
+    checkEdges(edgesToFetch).map { stepResult =>
+      val ls = new util.ArrayList[structure.Edge]
+      stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
+      ls.iterator()
+    }
+  }
   override def tx(): Transaction = ???
 
-  override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ???
+
 
   override def variables(): Variables = ???
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 4970912..415a64e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -190,8 +190,8 @@ object Label extends Model[Label] {
         val serviceId = service.id.get
 
         /** insert serviceColumn */
-        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, 
Some(srcColumnType), schemaVersion)
-        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, 
Some(tgtColumnType), schemaVersion)
+        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, 
Some(srcColumnType))
+        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, 
Some(tgtColumnType))
 
         if (srcCol.columnType != srcColumnType) throw new 
RuntimeException(s"source service column type not matched ${srcCol.columnType} 
!= ${srcColumnType}")
         if (tgtCol.columnType != tgtColumnType) throw new 
RuntimeException(s"target service column type not matched ${tgtCol.columnType} 
!= ${tgtColumnType}")
@@ -480,7 +480,7 @@ case class Label(id: Option[Int], label: String,
     for {
       (k, v) <- props
       labelMeta <- metaPropsInvMap.get(k)
-      innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+      innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
     } yield labelMeta -> InnerValLikeWithTs(innerVal, ts)
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index ebbbf88..f791f22 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -76,7 +76,7 @@ object ServiceColumn extends Model[ServiceColumn] {
       expireCaches(key)
     }
   }
-  def findOrInsert(serviceId: Int, columnName: String, columnType: 
Option[String], schemaVersion: String)(implicit session: DBSession = 
AutoSession): ServiceColumn = {
+  def findOrInsert(serviceId: Int, columnName: String, columnType: 
Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION)(implicit 
session: DBSession = AutoSession): ServiceColumn = {
     find(serviceId, columnName) match {
       case Some(sc) => sc
       case None =>
@@ -97,9 +97,14 @@ object ServiceColumn extends Model[ServiceColumn] {
       var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
       (cacheKey -> x)
     })
+    ls
   }
 }
-case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, 
columnType: String, schemaVersion: String)  {
+case class ServiceColumn(id: Option[Int],
+                         serviceId: Int,
+                         columnName: String,
+                         columnType: String,
+                         schemaVersion: String)  {
 
   lazy val service = Service.findById(serviceId)
   lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) 
:+ ColumnMeta.lastModifiedAtColumn
@@ -112,7 +117,7 @@ case class ServiceColumn(id: Option[Int], serviceId: Int, 
columnName: String, co
     for {
       (k, v) <- props
       labelMeta <- metasInvMap.get(k)
-      innerVal = toInnerVal(v.toString, labelMeta.dataType, schemaVersion)
+      innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
     } yield labelMeta -> innerVal
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
index d82c507..af20483 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.types.{LabelWithDirection, SourceVertexId, 
VertexId}
+import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, 
SourceVertexId, VertexId}
 
 
 trait Deserializable[E] extends StorageDeserializable[E] {
@@ -28,16 +28,16 @@ trait Deserializable[E] extends StorageDeserializable[E] {
 
   type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
 
-  /** version 1 and version 2 share same code for parsing row key part */
-  def parseRow(kv: SKeyValue, version: String): RowKeyRaw = {
-    var pos = 0
-    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
-    pos += srcIdLen
-    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-    pos += 4
-    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, 
pos)
-
-    val rowLen = srcIdLen + 4 + 1
-    (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
-  }
+//  /** version 1 and version 2 share same code for parsing row key part */
+//  def parseRow(kv: SKeyValue, version: String = HBaseType.DEFAULT_VERSION): 
RowKeyRaw = {
+//    var pos = 0
+//    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
+//    pos += srcIdLen
+//    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+//    pos += 4
+//    val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+//
+//    val rowLen = srcIdLen + 4 + 1
+//    (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
+//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 59b7518..e067983 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -75,7 +75,8 @@ abstract class Storage[Q, R](val graph: S2Graph,
    */
   def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): 
Serializable[SnapshotEdge] = {
     snapshotEdge.schemaVer match {
-      case VERSION1 | VERSION2 => new 
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
+//      case VERSION1 |
+      case VERSION2 => new 
serde.snapshotedge.wide.SnapshotEdgeSerializable(snapshotEdge)
       case VERSION3 | VERSION4 => new 
serde.snapshotedge.tall.SnapshotEdgeSerializable(snapshotEdge)
       case _ => throw new RuntimeException(s"not supported version: 
${snapshotEdge.schemaVer}")
     }
@@ -88,7 +89,8 @@ abstract class Storage[Q, R](val graph: S2Graph,
    */
   def indexEdgeSerializer(indexEdge: IndexEdge): Serializable[IndexEdge] = {
     indexEdge.schemaVer match {
-      case VERSION1 | VERSION2 | VERSION3 => new 
IndexEdgeSerializable(indexEdge)
+//      case VERSION1
+      case VERSION2 | VERSION3 => new IndexEdgeSerializable(indexEdge)
       case VERSION4 => new 
serde.indexedge.tall.IndexEdgeSerializable(indexEdge)
       case _ => throw new RuntimeException(s"not supported version: 
${indexEdge.schemaVer}")
 
@@ -112,25 +114,14 @@ abstract class Storage[Q, R](val graph: S2Graph,
    * then that storage implementation is responsible to provide implicit type 
conversion method on CanSKeyValue.
    * */
 
-  val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = 
Map(
-    VERSION1 -> new SnapshotEdgeDeserializable(graph),
-    VERSION2 -> new SnapshotEdgeDeserializable(graph),
-    VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph),
-    VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
-  )
-  def snapshotEdgeDeserializer(schemaVer: String) =
-    snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
+  val snapshotEdgeDeserializer: Deserializable[SnapshotEdge] = new 
serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph)
+
+  def snapshotEdgeDeserializer(schemaVer: String): 
Deserializable[SnapshotEdge] = snapshotEdgeDeserializer
 
   /** create deserializer that can parse stored CanSKeyValue into indexEdge. */
-  val indexEdgeDeserializers: Map[String, Deserializable[S2Edge]] = Map(
-    VERSION1 -> new IndexEdgeDeserializable(graph),
-    VERSION2 -> new IndexEdgeDeserializable(graph),
-    VERSION3 -> new IndexEdgeDeserializable(graph),
-    VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable(graph)
-  )
+  val indexEdgeDeserializer: Deserializable[S2Edge] = new 
serde.indexedge.tall.IndexEdgeDeserializable(graph)
 
-  def indexEdgeDeserializer(schemaVer: String) =
-    indexEdgeDeserializers.get(schemaVer).getOrElse(throw new 
RuntimeException(s"not supported version: ${schemaVer}"))
+  def indexEdgeDeserializer(schemaVer: String) = new 
serde.indexedge.tall.IndexEdgeDeserializable(graph)
 
   /** create deserializer that can parser stored CanSKeyValue into vertex. */
   val vertexDeserializer: Deserializable[S2Vertex] = new 
VertexDeserializable(graph)
@@ -251,6 +242,10 @@ abstract class Storage[Q, R](val graph: S2Graph,
   def flush(): Unit = {
   }
 
+  def fetchEdgesAll(): Future[Seq[S2Edge]]
+
+  def fetchVerticesAll(): Future[Seq[S2Vertex]]
+
   /**
    * create table on storage.
    * if storage implementation does not support namespace or table, then there 
is nothing to be done
@@ -280,7 +275,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
     def fromResult(kvs: Seq[SKeyValue],
                    version: String): Option[S2Vertex] = {
       if (kvs.isEmpty) None
-      else vertexDeserializer.fromKeyValues(None, kvs, version, None)
+      else vertexDeserializer.fromKeyValues(kvs, None)
 //        .map(S2Vertex(graph, _))
     }
 
@@ -837,7 +832,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
       val queryOption = queryRequest.query.queryOption
       val queryParam = queryRequest.queryParam
       val schemaVer = queryParam.label.schemaVersion
-      val indexEdgeOpt = 
indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), 
Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+      val indexEdgeOpt = 
indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
       if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => 
indexEdge.copy(parentEdges = parentEdges))
       else indexEdgeOpt
     } catch {
@@ -855,7 +850,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
 //        logger.debug(s"SnapshottoEdge: $kv")
     val queryParam = queryRequest.queryParam
     val schemaVer = queryParam.label.schemaVersion
-    val snapshotEdgeOpt = 
snapshotEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), 
Seq(kv), queryParam.label.schemaVersion, cacheElementOpt)
+    val snapshotEdgeOpt = 
snapshotEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
 
     if (isInnerCall) {
       snapshotEdgeOpt.flatMap { snapshotEdge =>
@@ -901,7 +896,7 @@ abstract class Storage[Q, R](val graph: S2Graph,
       val schemaVer = queryParam.label.schemaVersion
       val cacheElementOpt =
         if (queryParam.isSnapshotEdge) None
-        else 
indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), 
Seq(kv), queryParam.label.schemaVersion, None)
+        else indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), None)
 
       val (degreeEdges, keyValues) = cacheElementOpt match {
         case None => (Nil, kvs)
@@ -971,14 +966,14 @@ abstract class Storage[Q, R](val graph: S2Graph,
     tgtVertexIdOpt match {
       case Some(tgtVertexId) => // _to is given.
         /** we use toSnapshotEdge so dont need to swap src, tgt */
-        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
-        val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, 
label.schemaVersion)
+        val src = srcVertex.innerId
+        val tgt = tgtVertexId
         val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), 
TargetVertexId(tgtColumn, tgt))
         val (srcV, tgtV) = (graph.newVertex(srcVId), graph.newVertex(tgtVId))
 
         graph.newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = 
propsWithTs)
       case None =>
-        val src = InnerVal.convertVersion(srcVertex.innerId, 
srcColumn.columnType, label.schemaVersion)
+        val src = srcVertex.innerId
         val srcVId = SourceVertexId(srcColumn, src)
         val srcV = graph.newVertex(srcVId)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
index d2a7de7..811cf62 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
@@ -106,14 +106,15 @@ object StorageDeserializable {
 }
 
 trait StorageDeserializable[E] {
-  def fromKeyValues[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T], 
version: String, cacheElementOpt: Option[E]): Option[E] = {
-    try {
-      Option(fromKeyValuesInner(checkLabel, kvs, version, cacheElementOpt))
-    } catch {
-      case e: Exception =>
-        logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
-        None
-    }
-  }
-  def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], kvs: 
Seq[T], version: String, cacheElementOpt: Option[E]): E
+  def fromKeyValues[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: Option[E]): 
Option[E]
+//  = {
+//    try {
+//      Option(fromKeyValuesInner(kvs, cacheElementOpt))
+//    } catch {
+//      case e: Exception =>
+//        logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
+//        None
+//    }
+//  }
+//  def fromKeyValuesInner[T: CanSKeyValue](kvs: Seq[T], cacheElementOpt: 
Option[E]): E
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 93b2454..676ba41 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, 
HTableDescriptor, TableName}
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, 
ScanWithRange}
 import org.apache.s2graph.core.types.{HBaseType, VertexId}
@@ -110,6 +110,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
   lazy val clients = Seq(client, clientWithFlush)
 
   private val emptyKeyValues = new util.ArrayList[KeyValue]()
+  private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
   private val emptyStepResult = new util.ArrayList[StepResult]()
 
   private def client(withWait: Boolean): HBaseClient = if (withWait) 
clientWithFlush else client
@@ -522,7 +523,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
                    version: String): Option[S2Vertex] = {
 
       if (kvs.isEmpty) None
-      else vertexDeserializer.fromKeyValues(None, kvs, version, None)
+      else vertexDeserializer.fromKeyValues(kvs, None)
 //        .map(S2Vertex(graph, _))
     }
 
@@ -542,6 +543,39 @@ class AsynchbaseStorage(override val graph: S2Graph,
     Future.sequence(futures).map { result => result.toList.flatten }
   }
 
+  override def fetchEdgesAll(): Future[Seq[S2Edge]] = {
+    val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case 
(hTableName, labels) =>
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.edgeCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(10000).toFuture(emptyKeyValuesLs).map { kvsLs =>
+        kvsLs.flatMap { kvs =>
+          kvs.flatMap { kv =>
+            indexEdgeDeserializer.fromKeyValues(Seq(kv), None)
+          }
+        }
+      }
+    }
+
+    Future.sequence(futures).map(_.flatten)
+  }
+
+  override def fetchVerticesAll(): Future[Seq[S2Vertex]] = {
+    val futures = 
ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case 
(hTableName, columns) =>
+      val scan = AsynchbasePatcher.newScanner(client, hTableName)
+      scan.setFamily(Serializable.vertexCf)
+      scan.setMaxVersions(1)
+
+      scan.nextRows(10000).toFuture(emptyKeyValuesLs).map { kvsLs =>
+        kvsLs.flatMap { kvs =>
+          vertexDeserializer.fromKeyValues(kvs, None)
+        }
+      }
+    }
+    Future.sequence(futures).map(_.flatten)
+  }
+
   class V4ResultHandler(scanner: Scanner, defer: 
Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends 
Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] {
     val results = new util.ArrayList[KeyValue]()
     var offsetCount = 0

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 5549f4e..8f47f97 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -20,13 +20,11 @@
 package org.apache.s2graph.core.storage.serde.indexedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
StorageDeserializable}
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core._
-import scala.collection.immutable
 
 object IndexEdgeDeserializable{
   def getNewInstance(graph: S2Graph) = new IndexEdgeDeserializable(graph)
@@ -38,101 +36,126 @@ class IndexEdgeDeserializable(graph: S2Graph,
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, 
Boolean, Int)
    type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
 
-   override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
-                                                    _kvs: Seq[T],
-                                                    schemaVer: String,
-                                                    cacheElementOpt: 
Option[S2Edge]): S2Edge = {
-
-     assert(_kvs.size == 1)
-
-     //     val kvs = _kvs.map { kv => 
implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-     val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
-//     logger.debug(s"[DES]: ${kv.toLogString}}")
-
-     val version = kv.timestamp
-     //    logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, 
${kv.value.toList}")
-     var pos = 0
-     val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, schemaVer)
-     pos += srcIdLen
-     val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-     pos += 4
-     val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-     pos += 1
-
-     val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
-
-     val srcVertex = graph.newVertex(srcVertexId, version)
-     //TODO:
-     val edge = graph.newEdge(srcVertex, null,
-       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
S2Edge.EmptyState)
-     var tsVal = version
-
-     if (pos == kv.row.length) {
-       // degree
-       //      val degreeVal = Bytes.toLong(kv.value)
-       val degreeVal = bytesToLongFunc(kv.value, 0)
-       val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", 
schemaVer))
-
-       edge.property(LabelMeta.timestamp.name, version, version)
-       edge.property(LabelMeta.degree.name, degreeVal, version)
-       edge.tgtVertex = graph.newVertex(tgtVertexId, version)
-       edge.op = GraphUtil.defaultOpByte
-       edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
-       edge
-     } else {
-       // not degree edge
-       val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
-       pos = endAt
-
-
-       val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length - 1) {
-         (HBaseType.defaultTgtVertexId, 0)
-       } else {
-         TargetVertexId.fromBytes(kv.row, endAt, kv.row.length - 1, schemaVer)
-       }
-       val op = kv.row(kv.row.length-1)
-
-       val index = label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
-       /** process indexProps */
-       val size = idxPropsRaw.length
-       (0 until size).foreach { ith =>
-         val meta = index.sortKeyTypesArray(ith)
-         val (k, v) = idxPropsRaw(ith)
-         if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
-
-         if (k == LabelMeta.degree) {
-           edge.property(LabelMeta.degree.name, v.value, version)
+   override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+                                               cacheElementOpt: 
Option[S2Edge]): Option[S2Edge] = {
+
+     try {
+       assert(_kvs.size == 1)
+
+       //     val kvs = _kvs.map { kv => 
implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+       val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+       //     logger.debug(s"[DES]: ${kv.toLogString}}")
+
+       val version = kv.timestamp
+
+       var pos = 0
+       val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, HBaseType.DEFAULT_VERSION)
+       pos += srcIdLen
+       val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+       pos += 4
+       val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+       pos += 1
+
+       if (isInverted) None
+       else {
+         val label = Label.findById(labelWithDir.labelId)
+         val schemaVer = label.schemaVersion
+         val srcVertex = graph.newVertex(srcVertexId, version)
+         //TODO:
+         val edge = graph.newEdge(srcVertex, null,
+           label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
S2Edge.EmptyState)
+         var tsVal = version
+         val isTallSchema = label.schemaVersion == HBaseType.VERSION4
+         val isDegree = if (isTallSchema) pos == kv.row.length else 
kv.qualifier.isEmpty
+
+         if (isDegree) {
+           // degree
+           //      val degreeVal = Bytes.toLong(kv.value)
+           val degreeVal = bytesToLongFunc(kv.value, 0)
+           val tgtVertexId = VertexId(ServiceColumn.Default, 
InnerVal.withStr("0", schemaVer))
+
+           edge.property(LabelMeta.timestamp.name, version, version)
+           edge.property(LabelMeta.degree.name, degreeVal, version)
+           edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+           edge.op = GraphUtil.defaultOpByte
+           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
          } else {
-           edge.property(meta.name, v.value, version)
+           // not degree edge
+           val (idxPropsRaw, endAt) =
+             if (isTallSchema) bytesToProps(kv.row, pos, schemaVer)
+             else {
+               bytesToProps(kv.qualifier, 0, schemaVer)
+             }
+           pos = endAt
+
+           val (tgtVertexIdRaw, tgtVertexIdLen) = if (isTallSchema) {
+             if (endAt == kv.row.length - 1) {
+               (HBaseType.defaultTgtVertexId, 0)
+             } else {
+               TargetVertexId.fromBytes(kv.row, endAt, kv.row.length - 1, 
schemaVer)
+             }
+           } else {
+             if (endAt == kv.qualifier.length) {
+               (HBaseType.defaultTgtVertexId, 0)
+             } else {
+               TargetVertexId.fromBytes(kv.qualifier, endAt, 
kv.qualifier.length, schemaVer)
+             }
+           }
+           pos += tgtVertexIdLen
+
+           val op =
+             if (isTallSchema) kv.row(kv.row.length - 1)
+             else {
+               if (kv.qualifier.length == pos) GraphUtil.defaultOpByte
+               else kv.qualifier(kv.qualifier.length - 1)
+             }
+
+           val index = label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+           /** process indexProps */
+           val size = idxPropsRaw.length
+           (0 until size).foreach { ith =>
+             val meta = index.sortKeyTypesArray(ith)
+             val (k, v) = idxPropsRaw(ith)
+             if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
+
+             if (k == LabelMeta.degree) {
+               edge.property(LabelMeta.degree.name, v.value, version)
+             } else {
+               edge.property(meta.name, v.value, version)
+             }
+           }
+
+           /** process props */
+           if (op == GraphUtil.operations("incrementCount")) {
+             //        val countVal = Bytes.toLong(kv.value)
+             val countVal = bytesToLongFunc(kv.value, 0)
+             edge.property(LabelMeta.count.name, countVal, version)
+           } else {
+             val (props, endAt) = bytesToKeyValues(kv.value, 0, 
kv.value.length, schemaVer, label)
+             props.foreach { case (k, v) =>
+               if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
+
+               edge.property(k.name, v.value, version)
+             }
+           }
+
+           /** process tgtVertexId */
+           val tgtVertexId =
+             if (edge.checkProperty(LabelMeta.to.name)) {
+               val vId = 
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+               TargetVertexId(ServiceColumn.Default, vId.innerVal)
+             } else tgtVertexIdRaw
+
+           edge.property(LabelMeta.timestamp.name, tsVal, version)
+           edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+           edge.op = op
+           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
          }
+         Option(edge)
        }
-
-       /** process props */
-       if (op == GraphUtil.operations("incrementCount")) {
-         //        val countVal = Bytes.toLong(kv.value)
-         val countVal = bytesToLongFunc(kv.value, 0)
-         edge.property(LabelMeta.count.name, countVal, version)
-       } else {
-         val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
schemaVer, label)
-         props.foreach { case (k, v) =>
-           if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
-
-           edge.property(k.name, v.value, version)
-         }
-       }
-
-       /** process tgtVertexId */
-       val tgtVertexId =
-         if (edge.checkProperty(LabelMeta.to.name)) {
-           val vId = 
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
-           TargetVertexId(ServiceColumn.Default, vId.innerVal)
-         } else tgtVertexIdRaw
-
-
-       edge.tgtVertex = graph.newVertex(tgtVertexId, version)
-       edge.op = op
-       edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
-       edge
+     } catch {
+       case e: Exception =>
+         None
      }
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 31a1a89..09d7f4c 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -19,12 +19,12 @@
 
 package org.apache.s2graph.core.storage.serde.indexedge.wide
 
-import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types._
-import org.apache.s2graph.core._
-import scala.collection.immutable
 
 class IndexEdgeDeserializable(graph: S2Graph,
                               bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[S2Edge] {
@@ -33,105 +33,104 @@ class IndexEdgeDeserializable(graph: S2Graph,
    type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, 
Boolean, Int)
    type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
 
-   private def parseDegreeQualifier(kv: SKeyValue, schemaVer: String): 
QualifierRaw = {
-     //    val degree = Bytes.toLong(kv.value)
-     val degree = bytesToLongFunc(kv.value, 0)
-     val idxPropsRaw = Array(LabelMeta.degree -> InnerVal.withLong(degree, 
schemaVer))
-     val tgtVertexIdRaw = VertexId(ServiceColumn.Default, 
InnerVal.withStr("0", schemaVer))
-     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-   }
-
-   private def parseQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw 
= {
-     var qualifierLen = 0
-     var pos = 0
-     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-       val (props, endAt) = bytesToProps(kv.qualifier, pos, schemaVer)
-       pos = endAt
-       qualifierLen += endAt
-       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
-         (HBaseType.defaultTgtVertexId, 0)
-       } else {
-         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, 
schemaVer)
-       }
-       qualifierLen += tgtVertexIdLen
-       (props, endAt, tgtVertexId, tgtVertexIdLen)
-     }
-     val (op, opLen) =
-       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-       else (kv.qualifier(qualifierLen), 1)
-
-     qualifierLen += opLen
-
-     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-   }
-
-   override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
-                                                    _kvs: Seq[T],
-                                                    schemaVer: String,
-                                                    cacheElementOpt: 
Option[S2Edge]): S2Edge = {
-     assert(_kvs.size == 1)
-
-//     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-     val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
-     val version = kv.timestamp
-
-//     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = 
cacheElementOpt.map { e =>
-//       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-//     }.getOrElse(parseRow(kv, schemaVer))
-     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = parseRow(kv, 
schemaVer)
-
-     val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
-     val srcVertex = graph.newVertex(srcVertexId, version)
-     //TODO:
-     val edge = graph.newEdge(srcVertex, null,
-       label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
S2Edge.EmptyState)
-     var tsVal = version
-
-     val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
-       if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
-       else parseQualifier(kv, schemaVer)
-
-     val index = label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
-
-     /** process indexProps */
-     val size = idxPropsRaw.length
-     (0 until size).foreach { ith =>
-       val meta = index.sortKeyTypesArray(ith)
-       val (k, v) = idxPropsRaw(ith)
-       if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
-
-       if (k == LabelMeta.degree) {
-         edge.property(LabelMeta.degree.name, v.value, version)
-       } else {
-         edge.property(meta.name, v.value, version)
-       }
-     }
-
-     /** process props */
-     if (op == GraphUtil.operations("incrementCount")) {
-       //        val countVal = Bytes.toLong(kv.value)
-       val countVal = bytesToLongFunc(kv.value, 0)
-       edge.property(LabelMeta.count.name, countVal, version)
-     } else {
-       val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
schemaVer, label)
-       props.foreach { case (k, v) =>
-         if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
-
-         edge.property(k.name, v.value, version)
+   override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+                                               cacheElementOpt: 
Option[S2Edge]): Option[S2Edge] = {
+     try {
+       assert(_kvs.size == 1)
+
+       //     val kvs = _kvs.map { kv => 
implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+       val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+       val version = kv.timestamp
+
+       var pos = 0
+       val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, HBaseType.DEFAULT_VERSION)
+       pos += srcIdLen
+       val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+       pos += 4
+       val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+       pos += 1
+
+       if (isInverted) None
+       else {
+         val label = Label.findById(labelWithDir.labelId)
+         val schemaVer = label.schemaVersion
+         val srcVertex = graph.newVertex(srcVertexId, version)
+         //TODO:
+         val edge = graph.newEdge(srcVertex, null,
+           label, labelWithDir.dir, GraphUtil.defaultOpByte, version, 
S2Edge.EmptyState)
+         var tsVal = version
+
+         if (kv.qualifier.isEmpty) {
+           val degreeVal = bytesToLongFunc(kv.value, 0)
+           val tgtVertexId = VertexId(ServiceColumn.Default, 
InnerVal.withStr("0", schemaVer))
+
+           edge.property(LabelMeta.timestamp.name, version, version)
+           edge.property(LabelMeta.degree.name, degreeVal, version)
+           edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+           edge.op = GraphUtil.defaultOpByte
+           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+         } else {
+           pos = 0
+           val (idxPropsRaw, endAt) = bytesToProps(kv.qualifier, pos, 
schemaVer)
+           pos = endAt
+
+           val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == 
kv.qualifier.length) {
+             (HBaseType.defaultTgtVertexId, 0)
+           } else {
+             TargetVertexId.fromBytes(kv.qualifier, endAt, 
kv.qualifier.length, schemaVer)
+           }
+           pos += tgtVertexIdLen
+           val op =
+             if (kv.qualifier.length == pos) GraphUtil.defaultOpByte
+             else kv.qualifier(kv.qualifier.length-1)
+
+           val index = label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+
+           /** process indexProps */
+           val size = idxPropsRaw.length
+           (0 until size).foreach { ith =>
+             val meta = index.sortKeyTypesArray(ith)
+             val (k, v) = idxPropsRaw(ith)
+             if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
+
+             if (k == LabelMeta.degree) {
+               edge.property(LabelMeta.degree.name, v.value, version)
+             } else {
+               edge.property(meta.name, v.value, version)
+             }
+           }
+
+           /** process props */
+           if (op == GraphUtil.operations("incrementCount")) {
+             //        val countVal = Bytes.toLong(kv.value)
+             val countVal = bytesToLongFunc(kv.value, 0)
+             edge.property(LabelMeta.count.name, countVal, version)
+           } else {
+             val (props, endAt) = bytesToKeyValues(kv.value, 0, 
kv.value.length, schemaVer, label)
+             props.foreach { case (k, v) =>
+               if (k == LabelMeta.timestamp) tsVal = 
v.value.asInstanceOf[BigDecimal].longValue()
+
+               edge.property(k.name, v.value, version)
+             }
+           }
+           /** process tgtVertexId */
+           val tgtVertexId =
+             if (edge.checkProperty(LabelMeta.to.name)) {
+               val vId = 
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
+               TargetVertexId(ServiceColumn.Default, vId.innerVal)
+             } else tgtVertexIdRaw
+
+           edge.property(LabelMeta.timestamp.name, tsVal, version)
+           edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+           edge.op = op
+           edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+         }
+
+         Option(edge)
        }
+     } catch {
+       case e: Exception => None
      }
-     /** process tgtVertexId */
-     val tgtVertexId =
-       if (edge.checkProperty(LabelMeta.to.name)) {
-         val vId = 
edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
-         TargetVertexId(ServiceColumn.Default, vId.innerVal)
-       } else tgtVertexIdRaw
-
-     edge.property(LabelMeta.timestamp.name, tsVal, version)
-     edge.tgtVertex = graph.newVertex(tgtVertexId, version)
-     edge.op = op
-     edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
-     edge
    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index f4802c0..3b55ed8 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -23,8 +23,8 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelIndex, 
LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
SKeyValue, StorageDeserializable}
-import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, 
SourceAndTargetVertexIdPair, SourceVertexId}
-import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core._
 
 class SnapshotEdgeDeserializable(graph: S2Graph) extends 
Deserializable[SnapshotEdge] {
 
@@ -34,72 +34,76 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends 
Deserializable[Snapshot
     (statusCode.toByte, op.toByte)
   }
 
-  override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
-                                                   _kvs: Seq[T],
-                                                   version: String,
-                                                   cacheElementOpt: 
Option[SnapshotEdge]): SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
+  override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+                                              cacheElementOpt: 
Option[SnapshotEdge]): Option[SnapshotEdge] = {
+    try {
+      val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+      assert(kvs.size == 1)
 
-    val kv = kvs.head
-    val label = checkLabel.get
-    val schemaVer = label.schemaVersion
-    val cellVersion = kv.timestamp
-    /** rowKey */
-    def parseRowV3(kv: SKeyValue, version: String) = {
+      val kv = kvs.head
+      val version = kv.timestamp
       var pos = 0
-      val (srcIdAndTgtId, srcIdAndTgtIdLen) = 
SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
-      pos += srcIdAndTgtIdLen
+      val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, HBaseType.DEFAULT_VERSION)
+      pos += srcIdLen
+
+      val isTallSchema = pos + 5 != kv.row.length
+      var tgtVertexId = TargetVertexId(ServiceColumn.Default, 
srcVertexId.innerId)
+
+      if (isTallSchema) {
+        val (tgtId, tgtBytesLen) = InnerVal.fromBytes(kv.row, pos, 
kv.row.length, HBaseType.DEFAULT_VERSION)
+        tgtVertexId = TargetVertexId(ServiceColumn.Default, tgtId)
+        pos += tgtBytesLen
+      }
+
       val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
       pos += 4
       val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+      pos += 1
 
-      val rowLen = srcIdAndTgtIdLen + 4 + 1
-      (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, 
labelIdxSeq, isInverted, rowLen)
+      if (!isInverted) None
+      else {
+        val label = Label.findById(labelWithDir.labelId)
+        val schemaVer = label.schemaVersion
+//        val srcVertexId = SourceVertexId(ServiceColumn.Default, 
srcIdAndTgtId.srcInnerId)
+//        val tgtVertexId = SourceVertexId(ServiceColumn.Default, 
tgtId.tgtInnerId)
 
-    }
-    val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map 
{ e =>
-      (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, 
LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRowV3(kv, schemaVer))
+        var pos = 0
+        val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+        pos += 1
+        val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, 
label)
+        val kvsMap = props.toMap
+        val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+        val ts = tsInnerVal.toString.toLong
+        pos = endAt
 
-    val srcVertexId = SourceVertexId(ServiceColumn.Default, srcInnerId)
-    val tgtVertexId = SourceVertexId(ServiceColumn.Default, tgtInnerId)
+        val _pendingEdgeOpt =
+          if (pos == kv.value.length) None
+          else {
+            val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
+            pos += 1
+            //          val versionNum = Bytes.toLong(kv.value, pos, 8)
+            //          pos += 8
+            val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer, label)
+            pos = endAt
+            val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
-    val (props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = {
-      var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
-      pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, 
label)
-      val kvsMap = props.toMap
-      val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
-      val ts = tsInnerVal.toString.toLong
+            val pendingEdge =
+              graph.newEdge(graph.newVertex(srcVertexId, version),
+                graph.newVertex(tgtVertexId, version),
+                label, labelWithDir.dir, pendingEdgeOp,
+                version, pendingEdgeProps.toMap,
+                statusCode = pendingEdgeStatusCode, lockTs = lockTs, 
tsInnerValOpt = Option(tsInnerVal))
+            Option(pendingEdge)
+          }
 
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer, label)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+        val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, 
ts), graph.newVertex(tgtVertexId, ts),
+          label, labelWithDir.dir, op, version, props.toMap, statusCode = 
statusCode,
+          pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = 
Option(tsInnerVal))
 
-          val pendingEdge =
-            graph.newEdge(graph.newVertex(srcVertexId, cellVersion),
-              graph.newVertex(tgtVertexId, cellVersion),
-              label, labelWithDir.dir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs, 
tsInnerValOpt = Option(tsInnerVal))
-          Option(pendingEdge)
-        }
-
-      (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
+        Option(snapshotEdge)
+      }
+    } catch {
+      case e: Exception => None
     }
-
-    graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), 
graph.newVertex(tgtVertexId, ts),
-      label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = 
Option(tsInnerVal))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index d3dec1e..78ac2f7 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -23,8 +23,8 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
StorageDeserializable}
-import org.apache.s2graph.core.types.TargetVertexId
-import org.apache.s2graph.core.{S2Graph, S2Edge, SnapshotEdge, S2Vertex}
+import org.apache.s2graph.core.types.{LabelWithDirection, HBaseType, 
SourceVertexId, TargetVertexId}
+import org.apache.s2graph.core._
 
 class SnapshotEdgeDeserializable(graph: S2Graph) extends 
Deserializable[SnapshotEdge] {
 
@@ -34,58 +34,67 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends 
Deserializable[Snapshot
     (statusCode.toByte, op.toByte)
   }
 
-  override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
-                                                   _kvs: Seq[T],
-                                                   version: String,
-                                                   cacheElementOpt: 
Option[SnapshotEdge]): SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
+  override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+                                              cacheElementOpt: 
Option[SnapshotEdge]): Option[SnapshotEdge] = {
+    try {
+      val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+      assert(kvs.size == 1)
 
-    val kv = kvs.head
-    val label = checkLabel.get
-    val schemaVer = label.schemaVersion
-    val cellVersion = kv.timestamp
-
-    val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
-      (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRow(kv, schemaVer))
-
-    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) 
= {
-      val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, 
kv.qualifier.length, schemaVer)
+      val kv = kvs.head
+      val version = kv.timestamp
       var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+      val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, HBaseType.DEFAULT_VERSION)
+      pos += srcIdLen
+      val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+      pos += 4
+      val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
       pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, 
label)
-      val kvsMap = props.toMap
-      val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
-      val ts = tsInnerVal.toString.toLong
 
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer, label)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+      if (!isInverted) None
+      else {
+        val label = Label.findById(labelWithDir.labelId)
+        val schemaVer = label.schemaVersion
+        val srcVertex = graph.newVertex(srcVertexId, version)
 
-          val pendingEdge =
-            graph.newEdge(graph.newVertex(srcVertexId, cellVersion),
-              graph.newVertex(tgtVertexId, cellVersion),
-              label, labelWithDir.dir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs, 
tsInnerValOpt = Option(tsInnerVal))
-          Option(pendingEdge)
-        }
+        val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, 
kv.qualifier.length, schemaVer)
 
-      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
-    }
+        var pos = 0
+        val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+        pos += 1
+        val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, 
label)
+        val kvsMap = props.toMap
+        val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+        val ts = tsInnerVal.toString.toLong
+        pos = endAt
+
+        val _pendingEdgeOpt =
+          if (pos == kv.value.length) None
+          else {
+            val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
+            pos += 1
+            //          val versionNum = Bytes.toLong(kv.value, pos, 8)
+            //          pos += 8
+            val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer, label)
+            pos = endAt
+            val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
-    graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), 
graph.newVertex(tgtVertexId, ts),
-      label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = 
Option(tsInnerVal))
+            val pendingEdge =
+              graph.newEdge(graph.newVertex(srcVertexId, version),
+                graph.newVertex(tgtVertexId, version),
+                label, labelWithDir.dir, pendingEdgeOp,
+                version, pendingEdgeProps.toMap,
+                statusCode = pendingEdgeStatusCode, lockTs = lockTs, 
tsInnerValOpt = Option(tsInnerVal))
+            Option(pendingEdge)
+          }
+
+        val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, 
ts), graph.newVertex(tgtVertexId, ts),
+          label, labelWithDir.dir, op, version, props.toMap, statusCode = 
statusCode,
+          pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = 
Option(tsInnerVal))
+
+        Option(snapshotEdge)
+      }
+    } catch {
+      case e: Exception => None
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
index ee93505..b4a00e6 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -22,49 +22,51 @@ package org.apache.s2graph.core.storage.serde.vertex
 import org.apache.s2graph.core.mysqls.{ColumnMeta, Label}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
+import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, 
VertexId}
 import org.apache.s2graph.core.{S2Graph, QueryParam, S2Vertex}
 
 import scala.collection.mutable.ListBuffer
 
 class VertexDeserializable(graph: S2Graph,
                            bytesToInt: (Array[Byte], Int) => Int = bytesToInt) 
extends Deserializable[S2Vertex] {
-  def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
-                                          _kvs: Seq[T],
-                                          version: String,
-                                          cacheElementOpt: Option[S2Vertex]): 
S2Vertex = {
+  def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
+                                          cacheElementOpt: Option[S2Vertex]): 
Option[S2Vertex] = {
+    try {
+      val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
 
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+      val kv = kvs.head
+      val version = HBaseType.DEFAULT_VERSION
+      val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
 
-    val kv = kvs.head
-    val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
+      var maxTs = Long.MinValue
+      val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike]
+      val belongLabelIds = new ListBuffer[Int]
 
-    var maxTs = Long.MinValue
-    val propsMap = new collection.mutable.HashMap[ColumnMeta, InnerValLike]
-    val belongLabelIds = new ListBuffer[Int]
+      for {
+        kv <- kvs
+      } {
+        val propKey =
+          if (kv.qualifier.length == 1) kv.qualifier.head.toInt
+          else bytesToInt(kv.qualifier, 0)
 
-    for {
-      kv <- kvs
-    } {
-      val propKey =
-        if (kv.qualifier.length == 1) kv.qualifier.head.toInt
-        else bytesToInt(kv.qualifier, 0)
+        val ts = kv.timestamp
+        if (ts > maxTs) maxTs = ts
 
-      val ts = kv.timestamp
-      if (ts > maxTs) maxTs = ts
-
-      if (S2Vertex.isLabelId(propKey)) {
-        belongLabelIds += S2Vertex.toLabelId(propKey)
-      } else {
-        val v = kv.value
-        val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
-        val columnMeta = vertexId.column.metasMap(propKey)
-        propsMap += (columnMeta -> value)
+        if (S2Vertex.isLabelId(propKey)) {
+          belongLabelIds += S2Vertex.toLabelId(propKey)
+        } else {
+          val v = kv.value
+          val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
+          val columnMeta = vertexId.column.metasMap(propKey)
+          propsMap += (columnMeta -> value)
+        }
       }
+      assert(maxTs != Long.MinValue)
+      val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, 
belongLabelIds = belongLabelIds)
+      S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
+      Option(vertex)
+    } catch {
+      case e: Exception => None
     }
-    assert(maxTs != Long.MinValue)
-    val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, 
belongLabelIds = belongLabelIds)
-    S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
-    vertex
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
index b885bc6..77951c0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
@@ -26,7 +26,8 @@ object HBaseType {
   val VERSION4 = "v4"
   val VERSION3 = "v3"
   val VERSION2 = "v2"
-  val VERSION1 = "v1"
+  val ValidVersions = List(VERSION4, VERSION3, VERSION2)
+//  val VERSION1 = "v1"
 //  val DEFAULT_VERSION = VERSION2
   val DEFAULT_VERSION = VERSION3
 //  val EMPTY_SEQ_BYTE = Byte.MaxValue

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
index ea7aa41..c37728c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
@@ -84,7 +84,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
                 isVertexId: Boolean): (InnerValLike, Int) = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes, 
offset, len, version, isVertexId)
-      case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, 
isVertexId)
+//      case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, 
isVertexId)
       case _ => throw notSupportedEx(version)
     }
   }
@@ -92,7 +92,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   def withLong(l: Long, version: String): InnerValLike = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l))
-      case VERSION1 => v1.InnerVal(Some(l), None, None)
+//      case VERSION1 => v1.InnerVal(Some(l), None, None)
       case _ => throw notSupportedEx(version)
     }
   }
@@ -100,7 +100,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   def withInt(i: Int, version: String): InnerValLike = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i))
-      case VERSION1 => v1.InnerVal(Some(i.toLong), None, None)
+//      case VERSION1 => v1.InnerVal(Some(i.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
   }
@@ -108,7 +108,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   def withFloat(f: Float, version: String): InnerValLike = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => 
v2.InnerVal(BigDecimal(f.toDouble))
-      case VERSION1 => v1.InnerVal(Some(f.toLong), None, None)
+//      case VERSION1 => v1.InnerVal(Some(f.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
   }
@@ -116,7 +116,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   def withDouble(d: Double, version: String): InnerValLike = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d))
-      case VERSION1 => v1.InnerVal(Some(d.toLong), None, None)
+//      case VERSION1 => v1.InnerVal(Some(d.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
   }
@@ -124,7 +124,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   def withNumber(num: BigDecimal, version: String): InnerValLike = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num)
-      case VERSION1 => v1.InnerVal(Some(num.toLong), None, None)
+//      case VERSION1 => v1.InnerVal(Some(num.toLong), None, None)
       case _ => throw notSupportedEx(version)
     }
   }
@@ -132,7 +132,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   def withBoolean(b: Boolean, version: String): InnerValLike = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b)
-      case VERSION1 => v1.InnerVal(None, None, Some(b))
+//      case VERSION1 => v1.InnerVal(None, None, Some(b))
       case _ => throw notSupportedEx(version)
     }
   }
@@ -147,7 +147,7 @@ object InnerVal extends HBaseDeserializableWithIsVertexId {
   def withStr(s: String, version: String): InnerValLike = {
     version match {
       case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s)
-      case VERSION1 => v1.InnerVal(None, Some(s), None)
+//      case VERSION1 => v1.InnerVal(None, Some(s), None)
       case _ => throw notSupportedEx(version)
     }
   }
@@ -162,40 +162,40 @@ object InnerVal extends HBaseDeserializableWithIsVertexId 
{
 //  }
 
   /** nasty implementation for backward compatability */
-  def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: 
String): InnerValLike = {
-    val ret = toVersion match {
-      case VERSION2 | VERSION3 | VERSION4 =>
-        if (innerVal.isInstanceOf[v1.InnerVal]) {
-          val obj = innerVal.asInstanceOf[v1.InnerVal]
-          obj.valueType match {
-            case "long" => InnerVal.withLong(obj.longV.get, toVersion)
-            case "string" => InnerVal.withStr(obj.strV.get, toVersion)
-            case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion)
-            case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
-          }
-        } else {
-          innerVal
-        }
-      case VERSION1 =>
-        if (innerVal.isInstanceOf[v2.InnerVal]) {
-          val obj = innerVal.asInstanceOf[v2.InnerVal]
-          obj.value match {
-            case str: String => InnerVal.withStr(str, toVersion)
-            case b: Boolean => InnerVal.withBoolean(b, toVersion)
-            case n: BigDecimal => InnerVal.withNumber(n, toVersion)
-            case n: Long => InnerVal.withNumber(n, toVersion)
-            case n: Double => InnerVal.withNumber(n, toVersion)
-            case n: Int => InnerVal.withNumber(n, toVersion)
-            case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion")
-          }
-        } else {
-          innerVal
-        }
-      case _ => throw notSupportedEx(toVersion)
-    }
-//    logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret, 
${innerVal.bytes.toList}, ${ret.bytes.toList}")
-    ret
-  }
+//  def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: 
String): InnerValLike = {
+//    val ret = toVersion match {
+//      case VERSION2 | VERSION3 | VERSION4 =>
+//        if (innerVal.isInstanceOf[v1.InnerVal]) {
+//          val obj = innerVal.asInstanceOf[v1.InnerVal]
+//          obj.valueType match {
+//            case "long" => InnerVal.withLong(obj.longV.get, toVersion)
+//            case "string" => InnerVal.withStr(obj.strV.get, toVersion)
+//            case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion)
+//            case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
+//          }
+//        } else {
+//          innerVal
+//        }
+////      case VERSION1 =>
+////        if (innerVal.isInstanceOf[v2.InnerVal]) {
+////          val obj = innerVal.asInstanceOf[v2.InnerVal]
+////          obj.value match {
+////            case str: String => InnerVal.withStr(str, toVersion)
+////            case b: Boolean => InnerVal.withBoolean(b, toVersion)
+////            case n: BigDecimal => InnerVal.withNumber(n, toVersion)
+////            case n: Long => InnerVal.withNumber(n, toVersion)
+////            case n: Double => InnerVal.withNumber(n, toVersion)
+////            case n: Int => InnerVal.withNumber(n, toVersion)
+////            case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion")
+////          }
+////        } else {
+////          innerVal
+////        }
+//      case _ => throw notSupportedEx(toVersion)
+//    }
+////    logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret, 
${innerVal.bytes.toList}, ${ret.bytes.toList}")
+//    ret
+//  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index d280570..019ef67 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -298,7 +298,7 @@ trait IntegrateCommon extends FunSuite with Matchers with 
BeforeAndAfterAll {
     ],
     "consistencyLevel": "strong",
     "isDirected": true,
-    "schemaVersion": "v1",
+    "schemaVersion": "v2",
     "compressionAlgorithm": "gz"
   }"""
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
index bab6e03..7228c5d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/JsonParserTest.scala
@@ -30,7 +30,7 @@ class JsonParserTest extends FunSuite with Matchers with 
TestCommon {
   import types.HBaseType._
 
   val innerValsPerVersion = for {
-    version <- List(VERSION2, VERSION1)
+    version <- ValidVersions
   } yield {
       val innerVals = List(
         (InnerVal.withStr("ABC123", version), STRING),

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
index 0f8f833..3f6cd2a 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
@@ -44,15 +44,15 @@ trait TestCommon {
   def lessThanEqual(x: Array[Byte], y: Array[Byte]) = Bytes.compareTo(x, y) <= 0
 
   /** */
-  import HBaseType.{VERSION1, VERSION2}
-  private val tsValSmall = InnerVal.withLong(ts, VERSION1)
-  private val tsValLarge = InnerVal.withLong(ts + 1, VERSION1)
-  private val boolValSmall = InnerVal.withBoolean(false, VERSION1)
-  private val boolValLarge = InnerVal.withBoolean(true, VERSION1)
-  private val doubleValSmall = InnerVal.withDouble(-0.1, VERSION1)
-  private val doubleValLarge = InnerVal.withDouble(0.1, VERSION1)
+  import HBaseType._
+  private val tsValSmall = InnerVal.withLong(ts, DEFAULT_VERSION)
+  private val tsValLarge = InnerVal.withLong(ts + 1, DEFAULT_VERSION)
+  private val boolValSmall = InnerVal.withBoolean(false, DEFAULT_VERSION)
+  private val boolValLarge = InnerVal.withBoolean(true, DEFAULT_VERSION)
+  private val doubleValSmall = InnerVal.withDouble(-0.1, DEFAULT_VERSION)
+  private val doubleValLarge = InnerVal.withDouble(0.1, DEFAULT_VERSION)
   private val toSeq = LabelMeta.toSeq.toInt
-  private val toVal = InnerVal.withLong(Long.MinValue, VERSION1)
+  private val toVal = InnerVal.withLong(Long.MinValue, DEFAULT_VERSION)
 
 
   private val tsValSmallV2 = InnerVal.withLong(ts, VERSION2)
@@ -65,19 +65,19 @@ trait TestCommon {
 
   val intVals = (Int.MinValue until Int.MinValue + 10) ++ (-129 to -126) ++ 
(-1 to 1) ++ (126 to 129) ++
     (Int.MaxValue - 10 until Int.MaxValue)
-  val intInnerVals = intVals.map { v => InnerVal.withNumber(BigDecimal(v), 
VERSION1) }
+  val intInnerVals = intVals.map { v => InnerVal.withNumber(BigDecimal(v), 
DEFAULT_VERSION) }
 
   val intInnerValsV2 = intVals.map { v => InnerVal.withNumber(BigDecimal(v), 
VERSION2) }
 
   val stringVals = List("abc", "abd", "ac", "aca", "b")
-  val stringInnerVals = stringVals.map { s => InnerVal.withStr(s, VERSION1)}
+  val stringInnerVals = stringVals.map { s => InnerVal.withStr(s, 
DEFAULT_VERSION)}
   val stringInnerValsV2 = stringVals.map { s => InnerVal.withStr(s, VERSION2)}
 
   val numVals = (Long.MinValue until Long.MinValue + 10).map(BigDecimal(_)) ++
     (Int.MinValue until Int.MinValue + 10).map(BigDecimal(_)) ++
     (Int.MaxValue - 10 until Int.MaxValue).map(BigDecimal(_)) ++
     (Long.MaxValue - 10 until Long.MaxValue).map(BigDecimal(_))
-  val numInnerVals = numVals.map { n => InnerVal.withLong(n.toLong, VERSION1)}
+  val numInnerVals = numVals.map { n => InnerVal.withLong(n.toLong, 
DEFAULT_VERSION)}
   val numInnerValsV2 = numVals.map { n => InnerVal.withNumber(n, VERSION2)}
 
   val doubleStep = Double.MaxValue / 5
@@ -86,16 +86,16 @@ trait TestCommon {
     (-128.0 until 128.0 by 1.2).map(BigDecimal(_)) ++
     (129.0 until 142.0 by 1.1).map(BigDecimal(_)) ++
     (doubleStep until Double.MaxValue by doubleStep).map(BigDecimal(_))
-  val doubleInnerVals = doubleVals.map { d => InnerVal.withDouble(d.toDouble, 
VERSION1)}
+  val doubleInnerVals = doubleVals.map { d => InnerVal.withDouble(d.toDouble, 
DEFAULT_VERSION)}
   val doubleInnerValsV2 = doubleVals.map { d => 
InnerVal.withDouble(d.toDouble, VERSION2)}
 
   /** version 1 string order is broken */
   val idxPropsLs = Seq(
-    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ac", 
VERSION1)),(toSeq -> toVal)),
-    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ab", 
VERSION1)), (toSeq -> toVal)),
-    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2-> InnerVal.withStr("b", 
VERSION1)), (toSeq -> toVal)),
-    Seq((0 -> tsValSmall), (1 -> boolValLarge), (2 -> InnerVal.withStr("b", 
VERSION1)), (toSeq -> toVal)),
-    Seq((0 -> tsValLarge), (1 -> boolValSmall), (2 -> InnerVal.withStr("a", 
VERSION1)), (toSeq -> toVal))
+    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ac", 
DEFAULT_VERSION)),(toSeq -> toVal)),
+    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2 -> InnerVal.withStr("ab", 
DEFAULT_VERSION)), (toSeq -> toVal)),
+    Seq((0 -> tsValSmall), (1 -> boolValSmall), (2-> InnerVal.withStr("b", 
DEFAULT_VERSION)), (toSeq -> toVal)),
+    Seq((0 -> tsValSmall), (1 -> boolValLarge), (2 -> InnerVal.withStr("b", 
DEFAULT_VERSION)), (toSeq -> toVal)),
+    Seq((0 -> tsValLarge), (1 -> boolValSmall), (2 -> InnerVal.withStr("a", 
DEFAULT_VERSION)), (toSeq -> toVal))
   ).map(seq => seq.map(t => t._1.toByte -> t._2 ))
 
   val idxPropsLsV2 = Seq(

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 1997354..4614bed 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -138,7 +138,7 @@ trait TestCommonWithModels {
     implicit val session = AutoSession
 
     management.createLabel(labelName, serviceName, columnName, columnType, 
serviceName, columnName, columnType,
-      isDirected = true, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4", None)
+      isDirected = true, serviceName, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)
 
     management.createLabel(labelNameV2, serviceNameV2, columnNameV2, 
columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
       isDirected = true, serviceNameV2, testIdxProps, testProps, 
consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4", None)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index 25dd0e4..ad9299c 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -32,7 +32,7 @@ import scala.util.{Random, Try}
 class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels 
{
   initTests()
 
-  import HBaseType.{VERSION1, VERSION2}
+  import HBaseType._
 
   val ts = System.currentTimeMillis()
   val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, 
label.schemaVersion)
@@ -60,7 +60,7 @@ class WhereParserTest extends FunSuite with Matchers with 
TestCommonWithModels {
   }
 
   def ids = for {
-    version <- Seq(VERSION1, VERSION2)
+    version <- ValidVersions
   } yield {
     val srcId = SourceVertexId(ServiceColumn.Default, InnerVal.withLong(1, 
version))
     val tgtId =

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7951f552/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
index 5409d61..a5c974e 100644
--- 
a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
+++ 
b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -46,8 +46,7 @@ class IndexEdgeTest extends FunSuite with Matchers with 
TestCommonWithModels {
     val labelOpt = Option(l)
     val edge = graph.newEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, 
props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion)))
     val indexEdge = edge.edgesWithIndex.find(_.labelIndexSeq == 
LabelIndex.DefaultSeq).head
-    val _indexEdgeOpt = 
graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(labelOpt,
-      graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues, 
l.schemaVersion, None)
+    val _indexEdgeOpt = 
graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues,
 None)
 
     _indexEdgeOpt should not be empty
     edge == _indexEdgeOpt.get should be(true)

Reply via email to