http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
new file mode 100644
index 0000000..dacc37f
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -0,0 +1,53 @@
+package org.apache.s2graph.core.storage.serde.indexedge.tall
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
+import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core.{GraphUtil, IndexEdge}
+
+
+class IndexEdgeSerializable(indexEdge: IndexEdge) extends 
Serializable[IndexEdge] {
+   import StorageSerializable._
+
+   val label = indexEdge.label
+   val table = label.hbaseTableName.getBytes()
+   val cf = Serializable.edgeCf
+
+   val idxPropsMap = indexEdge.orders.toMap
+   val idxPropsBytes = propsToBytes(indexEdge.orders)
+
+   override def toKeyValues: Seq[SKeyValue] = {
+     val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+     val labelWithDirBytes = indexEdge.labelWithDir.bytes
+     val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+     val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
+     //    
logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+
+     val qualifier =
+       if (indexEdge.degreeEdge) Array.empty[Byte]
+       else
+         idxPropsMap.get(LabelMeta.toSeq) match {
+           case None => Bytes.add(idxPropsBytes, 
VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes)
+           case Some(vId) => idxPropsBytes
+         }
+
+     /** TODO search usage of op byte. if there is no, then remove opByte */
+     val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), 
qualifier)
+     //    val qualifierBytes = Array.fill(1)(indexEdge.op)
+     val qualifierBytes = Array.empty[Byte]
+
+     val value =
+       if (indexEdge.degreeEdge)
+         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
+       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
+       else propsToKeyValues(indexEdge.metas.toSeq)
+
+     val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, 
indexEdge.version)
+
+     //        logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, 
${kv.value.toList}")
+     Seq(kv)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
new file mode 100644
index 0000000..8b540cd
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -0,0 +1,116 @@
+package org.apache.s2graph.core.storage.serde.indexedge.wide
+
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.storage.StorageDeserializable._
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+
+class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[IndexEdge] {
+   import StorageDeserializable._
+
+   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, 
Int)
+   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+
+   private def parseDegreeQualifier(kv: SKeyValue, version: String): 
QualifierRaw = {
+     //    val degree = Bytes.toLong(kv.value)
+     val degree = bytesToLongFunc(kv.value, 0)
+     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, 
version))
+     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
+     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+   }
+
+   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
+     var qualifierLen = 0
+     var pos = 0
+     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+       pos = endAt
+       qualifierLen += endAt
+       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+         (HBaseType.defaultTgtVertexId, 0)
+       } else {
+         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, 
version)
+       }
+       qualifierLen += tgtVertexIdLen
+       (props, endAt, tgtVertexId, tgtVertexIdLen)
+     }
+     val (op, opLen) =
+       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+       else (kv.qualifier(qualifierLen), 1)
+
+     qualifierLen += opLen
+
+     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+   }
+
+   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
+     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
version)
+     (props, endAt)
+   }
+
+   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
+     (Array.empty[(Byte, InnerValLike)], 0)
+   }
+
+   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                    _kvs: Seq[T],
+                                                    version: String,
+                                                    cacheElementOpt: 
Option[IndexEdge]): IndexEdge = {
+     assert(_kvs.size == 1)
+
+     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+     val kv = kvs.head
+     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map 
{ e =>
+       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+     }.getOrElse(parseRow(kv, version))
+
+     val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+       if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
+       else parseQualifier(kv, version)
+
+     val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
+       //      val countVal = Bytes.toLong(kv.value)
+       val countVal = bytesToLongFunc(kv.value, 0)
+       val dummyProps = Array(LabelMeta.countSeq -> 
InnerVal.withLong(countVal, version))
+       (dummyProps, 8)
+     } else if (kv.qualifier.isEmpty) {
+       parseDegreeValue(kv, version)
+     } else {
+       parseValue(kv, version)
+     }
+
+     val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, 
${labelIdxSeq}"))
+
+
+     //    assert(kv.qualifier.nonEmpty && index.metaSeqs.size == 
idxPropsRaw.size)
+
+     val idxProps = for {
+       (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
+     } yield {
+         if (k == LabelMeta.degreeSeq) k -> v
+         else seq -> v
+       }
+
+     val idxPropsMap = idxProps.toMap
+     val tgtVertexId = if (tgtVertexIdInQualifier) {
+       idxPropsMap.get(LabelMeta.toSeq) match {
+         case None => tgtVertexIdRaw
+         case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
+       }
+     } else tgtVertexIdRaw
+
+     val _mergedProps = (idxProps ++ props).toMap
+     val mergedProps =
+       if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
+       else _mergedProps + (LabelMeta.timeStampSeq -> 
InnerVal.withLong(kv.timestamp, version))
+
+     //    logger.error(s"$mergedProps")
+     //    val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
+
+     val ts = kv.timestamp
+     IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, 
op, ts, labelIdxSeq, mergedProps)
+
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
new file mode 100644
index 0000000..6c70ae1
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -0,0 +1,52 @@
+package org.apache.s2graph.core.storage.serde.indexedge.wide
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
+import org.apache.s2graph.core.types.VertexId
+import org.apache.s2graph.core.{GraphUtil, IndexEdge}
+
+class IndexEdgeSerializable(indexEdge: IndexEdge) extends 
Serializable[IndexEdge] {
+   import StorageSerializable._
+
+   val label = indexEdge.label
+   val table = label.hbaseTableName.getBytes()
+   val cf = Serializable.edgeCf
+
+   val idxPropsMap = indexEdge.orders.toMap
+   val idxPropsBytes = propsToBytes(indexEdge.orders)
+
+   override def toKeyValues: Seq[SKeyValue] = {
+     val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+     val labelWithDirBytes = indexEdge.labelWithDir.bytes
+     val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+     val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
+     //    
logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
+     val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
+     val qualifier =
+       if (indexEdge.degreeEdge) Array.empty[Byte]
+       else {
+         if (indexEdge.op == GraphUtil.operations("incrementCount")) {
+           Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
+         } else {
+           idxPropsMap.get(LabelMeta.toSeq) match {
+             case None => Bytes.add(idxPropsBytes, tgtIdBytes)
+             case Some(vId) => idxPropsBytes
+           }
+         }
+       }
+
+
+     val value =
+       if (indexEdge.degreeEdge)
+         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
+       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
+         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
+       else propsToKeyValues(indexEdge.metas.toSeq)
+
+     val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
+
+     Seq(kv)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
new file mode 100644
index 0000000..37d5910
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -0,0 +1,84 @@
+package org.apache.s2graph.core.storage.serde.snapshotedge.tall
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.storage.StorageDeserializable._
+import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
SKeyValue, StorageDeserializable}
+import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, 
SourceAndTargetVertexIdPair, SourceVertexId}
+import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+
+class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+
+  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
+    val statusCode = byte >> 4
+    val op = byte & ((1 << 4) - 1)
+    (statusCode.toByte, op.toByte)
+  }
+
+  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                   _kvs: Seq[T],
+                                                   version: String,
+                                                   cacheElementOpt: 
Option[SnapshotEdge]): SnapshotEdge = {
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+    assert(kvs.size == 1)
+
+    val kv = kvs.head
+    val schemaVer = queryParam.label.schemaVersion
+    val cellVersion = kv.timestamp
+    /** rowKey */
+    def parseRowV3(kv: SKeyValue, version: String) = {
+      var pos = 0
+      val (srcIdAndTgtId, srcIdAndTgtIdLen) = 
SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
+      pos += srcIdAndTgtIdLen
+      val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+      pos += 4
+      val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+
+      val rowLen = srcIdAndTgtIdLen + 4 + 1
+      (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, 
labelIdxSeq, isInverted, rowLen)
+
+    }
+    val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map 
{ e =>
+      (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, 
LabelIndex.DefaultSeq, true, 0)
+    }.getOrElse(parseRowV3(kv, schemaVer))
+
+    val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
+    val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
+
+    val (props, op, ts, statusCode, _pendingEdgeOpt) = {
+      var pos = 0
+      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+      pos += 1
+      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+      val kvsMap = props.toMap
+      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+      pos = endAt
+      val _pendingEdgeOpt =
+        if (pos == kv.value.length) None
+        else {
+          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
+          pos += 1
+          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
+          //          pos += 8
+          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer)
+          pos = endAt
+          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+
+          val pendingEdge =
+            Edge(Vertex(srcVertexId, cellVersion),
+              Vertex(tgtVertexId, cellVersion),
+              labelWithDir, pendingEdgeOp,
+              cellVersion, pendingEdgeProps.toMap,
+              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+          Option(pendingEdge)
+        }
+
+      (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+    }
+
+    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+      labelWithDir, op, cellVersion, props, statusCode = statusCode,
+      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
new file mode 100644
index 0000000..f018827
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -0,0 +1,47 @@
+package org.apache.s2graph.core.storage.serde.snapshotedge.tall
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.SnapshotEdge
+import org.apache.s2graph.core.mysqls.LabelIndex
+import org.apache.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
+import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair
+
+
+class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends 
Serializable[SnapshotEdge] {
+  import StorageSerializable._
+
+  val label = snapshotEdge.label
+  val table = label.hbaseTableName.getBytes()
+  val cf = Serializable.edgeCf
+
+  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
+    val byte = (((statusCode << 4) | op).toByte)
+    Array.fill(1)(byte.toByte)
+  }
+  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, 
snapshotEdge.op),
+    propsToKeyValuesWithTs(snapshotEdge.props.toList))
+
+  override def toKeyValues: Seq[SKeyValue] = {
+    val srcIdAndTgtIdBytes = 
SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, 
snapshotEdge.tgtVertex.innerId).bytes
+    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
+    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
+
+    val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
+
+    val qualifier = Array.empty[Byte]
+
+    val value = snapshotEdge.pendingEdgeOpt match {
+      case None => valueBytes()
+      case Some(pendingEdge) =>
+        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+        val versionBytes = Array.empty[Byte]
+        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
+
+        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))
+    }
+
+    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
+    Seq(kv)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
new file mode 100644
index 0000000..68eb125
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -0,0 +1,70 @@
+package org.apache.s2graph.core.storage.serde.snapshotedge.wide
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.storage.StorageDeserializable._
+import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, 
StorageDeserializable}
+import org.apache.s2graph.core.types.TargetVertexId
+import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+
+class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
+
+  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
+    val statusCode = byte >> 4
+    val op = byte & ((1 << 4) - 1)
+    (statusCode.toByte, op.toByte)
+  }
+
+  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                                   _kvs: Seq[T],
+                                                   version: String,
+                                                   cacheElementOpt: 
Option[SnapshotEdge]): SnapshotEdge = {
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+    assert(kvs.size == 1)
+
+    val kv = kvs.head
+    val schemaVer = queryParam.label.schemaVersion
+    val cellVersion = kv.timestamp
+
+    val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
+      (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
+    }.getOrElse(parseRow(kv, schemaVer))
+
+    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
+      val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, 
kv.qualifier.length, schemaVer)
+      var pos = 0
+      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
+      pos += 1
+      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+      val kvsMap = props.toMap
+      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+
+      pos = endAt
+      val _pendingEdgeOpt =
+        if (pos == kv.value.length) None
+        else {
+          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
+          pos += 1
+          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
+          //          pos += 8
+          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer)
+          pos = endAt
+          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
+
+          val pendingEdge =
+            Edge(Vertex(srcVertexId, cellVersion),
+              Vertex(tgtVertexId, cellVersion),
+              labelWithDir, pendingEdgeOp,
+              cellVersion, pendingEdgeProps.toMap,
+              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+          Option(pendingEdge)
+        }
+
+      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+    }
+
+    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+      labelWithDir, op, cellVersion, props, statusCode = statusCode,
+      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
new file mode 100644
index 0000000..e4d0ac1
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -0,0 +1,51 @@
+package org.apache.s2graph.core.storage.serde.snapshotedge.wide
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.SnapshotEdge
+import org.apache.s2graph.core.mysqls.LabelIndex
+import org.apache.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
+import org.apache.s2graph.core.types.VertexId
+
+
+
+/**
+ * this class serialize
+ * @param snapshotEdge
+ */
+class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends 
Serializable[SnapshotEdge] {
+  import StorageSerializable._
+
+  val label = snapshotEdge.label
+  val table = label.hbaseTableName.getBytes()
+  val cf = Serializable.edgeCf
+
+  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
+    val byte = (((statusCode << 4) | op).toByte)
+    Array.fill(1)(byte.toByte)
+  }
+  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, 
snapshotEdge.op),
+    propsToKeyValuesWithTs(snapshotEdge.props.toList))
+
+  override def toKeyValues: Seq[SKeyValue] = {
+    val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
+    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
+    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
+
+    val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
+    val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
+
+    val qualifier = tgtIdBytes
+
+    val value = snapshotEdge.pendingEdgeOpt match {
+      case None => valueBytes()
+      case Some(pendingEdge) =>
+        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+        val versionBytes = Array.empty[Byte]
+        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
+        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))
+    }
+    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
+    Seq(kv)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
new file mode 100644
index 0000000..00a5dc2
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -0,0 +1,47 @@
+package org.apache.s2graph.core.storage.serde.vertex
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
+import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
+import org.apache.s2graph.core.{QueryParam, Vertex}
+
+import scala.collection.mutable.ListBuffer
+
+class VertexDeserializable extends Deserializable[Vertex] {
+  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+                                     _kvs: Seq[T],
+                                     version: String,
+                                     cacheElementOpt: Option[Vertex]): Vertex 
= {
+
+    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+    val kv = kvs.head
+    val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
+
+    var maxTs = Long.MinValue
+    val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
+    val belongLabelIds = new ListBuffer[Int]
+
+    for {
+      kv <- kvs
+    } {
+      val propKey =
+        if (kv.qualifier.length == 1) kv.qualifier.head.toInt
+        else Bytes.toInt(kv.qualifier)
+
+      val ts = kv.timestamp
+      if (ts > maxTs) maxTs = ts
+
+      if (Vertex.isLabelId(propKey)) {
+        belongLabelIds += Vertex.toLabelId(propKey)
+      } else {
+        val v = kv.value
+        val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
+        propsMap += (propKey -> value)
+      }
+    }
+    assert(maxTs != Long.MinValue)
+    Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
new file mode 100644
index 0000000..a81a86e
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -0,0 +1,19 @@
+package org.apache.s2graph.core.storage.serde.vertex
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.Vertex
+import org.apache.s2graph.core.storage.{SKeyValue, Serializable}
+
+case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
+
+  val cf = Serializable.vertexCf
+
+  override def toKeyValues: Seq[SKeyValue] = {
+    val row = vertex.id.bytes
+    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield 
Bytes.toBytes(k) -> v.bytes
+    val belongsTo = vertex.belongLabelIds.map { labelId => 
Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
+    (base ++ belongsTo).map { case (qualifier, value) =>
+      SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, 
vertex.ts)
+    } toSeq
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
new file mode 100644
index 0000000..a207547
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
@@ -0,0 +1,160 @@
+package org.apache.s2graph.core.types
+
+import org.apache.hadoop.hbase.util.Bytes
+
+object HBaseType {
+  val VERSION4 = "v4"
+  val VERSION3 = "v3"
+  val VERSION2 = "v2"
+  val VERSION1 = "v1"
+//  val DEFAULT_VERSION = VERSION2
+  val DEFAULT_VERSION = VERSION3
+  val EMPTY_SEQ_BYTE = Byte.MaxValue
+  val DEFAULT_COL_ID = 0
+  val bitsForDir = 2
+  val maxBytes = Bytes.toBytes(Int.MaxValue)
+  val toSeqByte = -5.toByte
+  val defaultTgtVertexId = null
+}
+
+object HBaseDeserializable {
+
+  import HBaseType._
+
+  // 6 bits is used for index sequence so total index per label is limited to 
2^6
+  def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): 
(Byte, Boolean) = {
+    val byte = bytes(offset)
+    val isInverted = if ((byte & 1) != 0) true else false
+    val labelOrderSeq = byte >> 1
+    (labelOrderSeq.toByte, isInverted)
+  }
+
+  def bytesToKeyValues(bytes: Array[Byte],
+                       offset: Int,
+                       length: Int,
+                       version: String = DEFAULT_VERSION): (Array[(Byte, 
InnerValLike)], Int) = {
+    var pos = offset
+    val len = bytes(pos)
+    pos += 1
+    val kvs = new Array[(Byte, InnerValLike)](len)
+    var i = 0
+    while (i < len) {
+      val k = bytes(pos)
+      pos += 1
+      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+      pos += numOfBytesUsed
+      kvs(i) = (k -> v)
+      i += 1
+    }
+    val ret = (kvs, pos)
+    //    logger.debug(s"bytesToProps: $ret")
+    ret
+  }
+
+  def bytesToKeyValuesWithTs(bytes: Array[Byte],
+                             offset: Int,
+                             version: String = DEFAULT_VERSION): (Array[(Byte, 
InnerValLikeWithTs)], Int) = {
+    var pos = offset
+    val len = bytes(pos)
+    pos += 1
+    val kvs = new Array[(Byte, InnerValLikeWithTs)](len)
+    var i = 0
+    while (i < len) {
+      val k = bytes(pos)
+      pos += 1
+      val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, 
version)
+      pos += numOfBytesUsed
+      kvs(i) = (k -> v)
+      i += 1
+    }
+    val ret = (kvs, pos)
+    //    logger.debug(s"bytesToProps: $ret")
+    ret
+  }
+
+  def bytesToProps(bytes: Array[Byte],
+                   offset: Int,
+                   version: String = DEFAULT_VERSION): (Array[(Byte, 
InnerValLike)], Int) = {
+    var pos = offset
+    val len = bytes(pos)
+    pos += 1
+    val kvs = new Array[(Byte, InnerValLike)](len)
+    var i = 0
+    while (i < len) {
+      val k = EMPTY_SEQ_BYTE
+      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+      pos += numOfBytesUsed
+      kvs(i) = (k -> v)
+      i += 1
+    }
+    //    logger.error(s"bytesToProps: $kvs")
+    val ret = (kvs, pos)
+
+    ret
+  }
+}
+
+object HBaseSerializable {
+  def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+    val len = props.length
+    assert(len < Byte.MaxValue)
+    var bytes = Array.fill(1)(len.toByte)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes)
+    bytes
+  }
+
+  def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+    val len = props.length
+    assert(len < Byte.MaxValue)
+    var bytes = Array.fill(1)(len.toByte)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+    bytes
+  }
+
+  def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): 
Array[Byte] = {
+    val len = props.length
+    assert(len < Byte.MaxValue)
+    var bytes = Array.fill(1)(len.toByte)
+    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+    bytes
+  }
+
+  def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): 
Array[Byte] = {
+    assert(labelOrderSeq < (1 << 6))
+    val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
+    Array.fill(1)(byte.toByte)
+  }
+}
+
+trait HBaseSerializable {
+  def bytes: Array[Byte]
+}
+
+trait HBaseDeserializable {
+
+  import HBaseType._
+
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION): (HBaseSerializable, Int)
+
+  //  def fromBytesWithIndex(bytes: Array[Byte],
+  //                offset: Int,
+  //                len: Int,
+  //                version: String = DEFAULT_VERSION): (HBaseSerializable, 
Int)
+  def notSupportedEx(version: String) = new RuntimeException(s"not supported 
version, $version")
+}
+
+trait HBaseDeserializableWithIsVertexId {
+
+  import HBaseType._
+
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION,
+                isVertexId: Boolean = false): (HBaseSerializable, Int)
+
+  def notSupportedEx(version: String) = new RuntimeException(s"not supported 
version, $version")
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
new file mode 100644
index 0000000..dd1b833
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
@@ -0,0 +1,243 @@
+package org.apache.s2graph.core.types
+
+import org.apache.hadoop.hbase.util._
+
+object InnerVal extends HBaseDeserializableWithIsVertexId {
+  import HBaseType._
+
+  val order = Order.DESCENDING
+  val stringLenOffset = 7.toByte
+  val maxStringLen = Byte.MaxValue - stringLenOffset
+  val maxMetaByte = Byte.MaxValue
+  val minMetaByte = 0.toByte
+
+  /** supported data type */
+  val BLOB = "blob"
+  val STRING = "string"
+  val DOUBLE = "double"
+  val FLOAT = "float"
+  val LONG = "long"
+  val INT = "integer"
+  val SHORT = "short"
+  val BYTE = "byte"
+  val NUMERICS = List(DOUBLE, FLOAT, LONG, INT, SHORT, BYTE)
+  val BOOLEAN = "boolean"
+
+  def isNumericType(dataType: String): Boolean = {
+    dataType match {
+      case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | 
InnerVal.FLOAT | InnerVal.DOUBLE => true
+      case _ => false
+    }
+  }
+  def toInnerDataType(dataType: String): String = {
+    dataType match {
+      case "blob" => BLOB
+      case "string" | "str" | "s" => STRING
+      case "double" | "d" | "float64" => DOUBLE
+      case "float" | "f" | "float32" => FLOAT
+      case "long" | "l" | "int64" | "integer64" => LONG
+      case "int" | "integer" | "i" | "int32" | "integer32" => INT
+      case "short" | "int16" | "integer16" => SHORT
+      case "byte" | "b" | "tinyint" | "int8" | "integer8" => BYTE
+      case "boolean" | "bool" => BOOLEAN
+      case _ => throw new RuntimeException(s"can`t convert $dataType into 
InnerDataType")
+    }
+  }
+
+  def numByteRange(num: BigDecimal) = {
+//    val byteLen =
+//      if (num.isValidByte | num.isValidChar) 1
+//      else if (num.isValidShort) 2
+//      else if (num.isValidInt) 4
+//      else if (num.isValidLong) 8
+//      else if (num.isValidFloat) 4
+//      else 12
+    val byteLen = 12
+    //      else throw new RuntimeException(s"wrong data $num")
+    new SimplePositionedMutableByteRange(byteLen + 4)
+  }
+
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String,
+                isVertexId: Boolean): (InnerValLike, Int) = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes, 
offset, len, version, isVertexId)
+      case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, 
isVertexId)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withLong(l: Long, version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l))
+      case VERSION1 => v1.InnerVal(Some(l), None, None)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withInt(i: Int, version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i))
+      case VERSION1 => v1.InnerVal(Some(i.toLong), None, None)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withFloat(f: Float, version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => 
v2.InnerVal(BigDecimal(f.toDouble))
+      case VERSION1 => v1.InnerVal(Some(f.toLong), None, None)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withDouble(d: Double, version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d))
+      case VERSION1 => v1.InnerVal(Some(d.toLong), None, None)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withNumber(num: BigDecimal, version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num)
+      case VERSION1 => v1.InnerVal(Some(num.toLong), None, None)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withBoolean(b: Boolean, version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b)
+      case VERSION1 => v1.InnerVal(None, None, Some(b))
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withBlob(blob: Array[Byte], version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(blob)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+  def withStr(s: String, version: String): InnerValLike = {
+    version match {
+      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s)
+      case VERSION1 => v1.InnerVal(None, Some(s), None)
+      case _ => throw notSupportedEx(version)
+    }
+  }
+
+//  def withInnerVal(innerVal: InnerValLike, version: String): InnerValLike = {
+//    val bytes = innerVal.bytes
+//    version match {
+//      case VERSION2 => v2.InnerVal.fromBytes(bytes, 0, bytes.length, 
version)._1
+//      case VERSION1 => v1.InnerVal.fromBytes(bytes, 0, bytes.length, 
version)._1
+//      case _ => throw notSupportedEx(version)
+//    }
+//  }
+
+  /** nasty implementation for backward compatability */
+  def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: 
String): InnerValLike = {
+    val ret = toVersion match {
+      case VERSION2 | VERSION3 | VERSION4 =>
+        if (innerVal.isInstanceOf[v1.InnerVal]) {
+          val obj = innerVal.asInstanceOf[v1.InnerVal]
+          obj.valueType match {
+            case "long" => InnerVal.withLong(obj.longV.get, toVersion)
+            case "string" => InnerVal.withStr(obj.strV.get, toVersion)
+            case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion)
+            case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
+          }
+        } else {
+          innerVal
+        }
+      case VERSION1 =>
+        if (innerVal.isInstanceOf[v2.InnerVal]) {
+          val obj = innerVal.asInstanceOf[v2.InnerVal]
+          obj.value match {
+            case str: String => InnerVal.withStr(str, toVersion)
+            case b: Boolean => InnerVal.withBoolean(b, toVersion)
+            case n: BigDecimal => InnerVal.withNumber(n, toVersion)
+            case n: Long => InnerVal.withNumber(n, toVersion)
+            case n: Double => InnerVal.withNumber(n, toVersion)
+            case n: Int => InnerVal.withNumber(n, toVersion)
+            case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion")
+          }
+        } else {
+          innerVal
+        }
+      case _ => throw notSupportedEx(toVersion)
+    }
+//    logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret, 
${innerVal.bytes.toList}, ${ret.bytes.toList}")
+    ret
+  }
+
+}
+
+trait InnerValLike extends HBaseSerializable {
+
+  val value: Any
+
+  def compare(other: InnerValLike): Int
+
+  def +(other: InnerValLike): InnerValLike
+
+  def <(other: InnerValLike) = this.compare(other) < 0
+
+  def <=(other: InnerValLike) = this.compare(other) <= 0
+
+  def >(other: InnerValLike) = this.compare(other) > 0
+
+  def >=(other: InnerValLike) = this.compare(other) >= 0
+
+  override def toString(): String = value.toString
+
+  override def hashCode(): Int = value.hashCode()
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: InnerValLike =>
+        val ret = toString == obj.toString
+//        logger.debug(s"InnerValLike.equals($this, $obj) => $ret")
+        ret
+      case _ => false
+    }
+  }
+  def hashKey(dataType: String): Int
+
+  def toIdString(): String
+
+}
+
+object InnerValLikeWithTs extends HBaseDeserializable {
+  import HBaseType._
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION): (InnerValLikeWithTs, Int) 
= {
+    val (innerVal, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, 
version)
+    val ts = Bytes.toLong(bytes, offset + numOfBytesUsed)
+    (InnerValLikeWithTs(innerVal, ts), numOfBytesUsed + 8)
+  }
+
+  def withLong(l: Long, ts: Long, version: String): InnerValLikeWithTs = {
+    InnerValLikeWithTs(InnerVal.withLong(l, version), ts)
+  }
+
+  def withStr(s: String, ts: Long, version: String): InnerValLikeWithTs = {
+    InnerValLikeWithTs(InnerVal.withStr(s, version), ts)
+  }
+}
+
+case class InnerValLikeWithTs(innerVal: InnerValLike, ts: Long)
+  extends HBaseSerializable {
+
+  def bytes: Array[Byte] = {
+    Bytes.add(innerVal.bytes, Bytes.toBytes(ts))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala
new file mode 100644
index 0000000..d34299b
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/types/LabelWithDirection.scala
@@ -0,0 +1,61 @@
+package org.apache.s2graph.core.types
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphUtil
+
+object LabelWithDirection {
+
+  import HBaseType._
+
+  def apply(compositeInt: Int): LabelWithDirection = {
+    //      logger.debug(s"CompositeInt: $compositeInt")
+
+    val dir = compositeInt & ((1 << bitsForDir) - 1)
+    val labelId = compositeInt >> bitsForDir
+    LabelWithDirection(labelId, dir)
+  }
+
+  def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): 
Array[Byte] = {
+    assert(labelOrderSeq < (1 << 6))
+    val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
+    Array.fill(1)(byte.toByte)
+  }
+
+  def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): 
(Byte, Boolean) = {
+    val byte = bytes(offset)
+    val isInverted = if ((byte & 1) != 0) true else false
+    val labelOrderSeq = byte >> 1
+    (labelOrderSeq.toByte, isInverted)
+  }
+}
+
+case class LabelWithDirection(labelId: Int, dir: Int) extends 
HBaseSerializable {
+
+  import HBaseType._
+
+  assert(dir < (1 << bitsForDir))
+  assert(labelId < (Int.MaxValue >> bitsForDir))
+
+  lazy val labelBits = labelId << bitsForDir
+
+  lazy val compositeInt = labelBits | dir
+
+  def bytes = {
+     Bytes.toBytes(compositeInt)
+  }
+
+  lazy val dirToggled = LabelWithDirection(labelId, GraphUtil.toggleDir(dir))
+
+  def updateDir(newDir: Int) = LabelWithDirection(labelId, newDir)
+
+  def isDirected = dir == 0 || dir == 1
+
+  override def hashCode(): Int = compositeInt
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case o: LabelWithDirection => hashCode == o.hashCode()
+      case _ => false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
new file mode 100644
index 0000000..79c7122
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
@@ -0,0 +1,142 @@
+package org.apache.s2graph.core.types
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.types.HBaseType._
+
+object VertexId extends HBaseDeserializable {
+  import HBaseType._
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION): (VertexId, Int) = {
+    /** since murmur hash is prepended, skip numOfBytes for murmur hash */
+    var pos = offset + GraphUtil.bytesForMurMurHash
+
+    val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, 
version, isVertexId = true)
+    pos += numOfBytesUsed
+    val colId = Bytes.toInt(bytes, pos, 4)
+    (VertexId(colId, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed + 
4)
+  }
+
+  def apply(colId: Int, innerId: InnerValLike): VertexId = new VertexId(colId, 
innerId)
+
+  def toSourceVertexId(vid: VertexId) = {
+    SourceVertexId(vid.colId, vid.innerId)
+  }
+
+  def toTargetVertexId(vid: VertexId) = {
+    TargetVertexId(vid.colId, vid.innerId)
+  }
+}
+
+class VertexId protected (val colId: Int, val innerId: InnerValLike) extends 
HBaseSerializable {
+  val storeHash: Boolean = true
+  val storeColId: Boolean = true
+  lazy val hashBytes =
+//    if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toString))
+    if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toIdString()))
+    else Array.empty[Byte]
+
+  lazy val colIdBytes: Array[Byte] =
+    if (storeColId) Bytes.toBytes(colId)
+    else Array.empty[Byte]
+
+  def bytes: Array[Byte] = Bytes.add(hashBytes, innerId.bytes, colIdBytes)
+
+  override def toString(): String = {
+    colId.toString() + "," + innerId.toString()
+//    s"VertexId($colId, $innerId)"
+  }
+
+  override def hashCode(): Int = {
+    val ret = if (storeColId) {
+      colId * 31 + innerId.hashCode()
+    } else {
+      innerId.hashCode()
+    }
+//    logger.debug(s"VertexId.hashCode: $ret")
+    ret
+  }
+  override def equals(obj: Any): Boolean = {
+    val ret = obj match {
+      case other: VertexId => colId == other.colId && innerId.toIdString() == 
other.innerId.toIdString()
+      case _ => false
+    }
+//    logger.debug(s"VertexId.equals: $this, $obj => $ret")
+    ret
+  }
+
+  def compareTo(other: VertexId): Int = {
+    Bytes.compareTo(bytes, other.bytes)
+  }
+  def <(other: VertexId): Boolean = compareTo(other) < 0
+  def <=(other: VertexId): Boolean = compareTo(other) <= 0
+  def >(other: VertexId): Boolean = compareTo(other) > 0
+  def >=(other: VertexId): Boolean = compareTo(other) >= 0
+}
+
+object SourceVertexId extends HBaseDeserializable {
+  import HBaseType._
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION): (VertexId, Int) = {
+    /** since murmur hash is prepended, skip numOfBytes for murmur hash */
+    val pos = offset + GraphUtil.bytesForMurMurHash
+    val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, 
version, isVertexId = true)
+
+    (SourceVertexId(DEFAULT_COL_ID, innerId), GraphUtil.bytesForMurMurHash + 
numOfBytesUsed)
+  }
+
+}
+
+
+case class SourceVertexId(override val colId: Int,
+                          override val innerId: InnerValLike) extends 
VertexId(colId, innerId) {
+  override val storeColId: Boolean = false
+}
+
+object TargetVertexId extends HBaseDeserializable {
+  import HBaseType._
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION): (VertexId, Int) = {
+    /** murmur has is not prepended so start from offset */
+    val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, 
version, isVertexId = true)
+    (TargetVertexId(DEFAULT_COL_ID, innerId), numOfBytesUsed)
+  }
+}
+
+case class TargetVertexId(override val colId: Int,
+                          override val innerId: InnerValLike)
+  extends  VertexId(colId, innerId) {
+  override val storeColId: Boolean = false
+  override val storeHash: Boolean = false
+
+}
+
+object SourceAndTargetVertexIdPair extends HBaseDeserializable {
+  val delimiter = ":"
+  import HBaseType._
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION): 
(SourceAndTargetVertexIdPair, Int) = {
+    val pos = offset + GraphUtil.bytesForMurMurHash
+    val (srcId, srcBytesLen) = InnerVal.fromBytes(bytes, pos, len, version, 
isVertexId = true)
+    val (tgtId, tgtBytesLen) = InnerVal.fromBytes(bytes, pos + srcBytesLen, 
len, version, isVertexId = true)
+    (SourceAndTargetVertexIdPair(srcId, tgtId), GraphUtil.bytesForMurMurHash + 
srcBytesLen + tgtBytesLen)
+  }
+}
+
+case class SourceAndTargetVertexIdPair(val srcInnerId: InnerValLike, val 
tgtInnerId: InnerValLike) extends HBaseSerializable {
+  val colId = DEFAULT_COL_ID
+  import SourceAndTargetVertexIdPair._
+  override def bytes: Array[Byte] = {
+    val hashBytes = Bytes.toBytes(GraphUtil.murmur3(srcInnerId + delimiter + 
tgtInnerId))
+    Bytes.add(hashBytes, srcInnerId.bytes, tgtInnerId.bytes)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala
new file mode 100644
index 0000000..c7b2c73
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/v1/InnerVal.scala
@@ -0,0 +1,224 @@
+package org.apache.s2graph.core.types.v1
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.GraphExceptions
+import org.apache.s2graph.core.GraphExceptions.IllegalDataTypeException
+import org.apache.s2graph.core.types.{HBaseDeserializableWithIsVertexId, 
HBaseSerializable, HBaseType, InnerValLike}
+
+object InnerVal extends HBaseDeserializableWithIsVertexId {
+  import HBaseType._
+  //  val defaultVal = new InnerVal(None, None, None)
+  val stringLenOffset = 7.toByte
+  val maxStringLen = Byte.MaxValue - stringLenOffset
+  val maxMetaByte = Byte.MaxValue
+  val minMetaByte = 0.toByte
+  /**
+   * first byte encoding rule.
+   * 0 => default
+   * 1 => long
+   * 2 => int
+   * 3 => short
+   * 4 => byte
+   * 5 => true
+   * 6 => false
+   * 7 ~ 127 => string len + 7
+   */
+  val metaByte = Map("default" -> 0, "long" -> 1, "int" -> 2, "short" -> 3,
+    "byte" -> 4, "true" -> 5, "false" -> 6).map {
+    case (k, v) => (k, v.toByte)
+  }
+  val metaByteRev = metaByte.map { case (k, v) => (v.toByte, k) } ++ 
metaByte.map { case (k, v) => ((-v).toByte, k) }
+
+  def maxIdVal(dataType: String) = {
+    dataType match {
+      case "string" => InnerVal.withStr((0 until (Byte.MaxValue - 
stringLenOffset)).map("~").mkString)
+      case "long" => InnerVal.withLong(Long.MaxValue)
+      case "bool" => InnerVal.withBoolean(true)
+      case _ => throw IllegalDataTypeException(dataType)
+    }
+  }
+
+  def minIdVal(dataType: String) = {
+    dataType match {
+      case "string" => InnerVal.withStr("")
+      case "long" => InnerVal.withLong(1)
+      case "bool" => InnerVal.withBoolean(false)
+      case _ => throw IllegalDataTypeException(dataType)
+    }
+  }
+
+  def fromBytes(bytes: Array[Byte], offset: Int, len: Int, version: String = 
DEFAULT_VERSION, isVertexId: Boolean = false): (InnerVal, Int) = {
+    var pos = offset
+    //
+    val header = bytes(pos)
+    //      logger.debug(s"${bytes(offset)}: ${bytes.toList.slice(pos, 
bytes.length)}")
+    pos += 1
+
+    var numOfBytesUsed = 0
+    val (longV, strV, boolV) = metaByteRev.get(header) match {
+      case Some(s) =>
+        s match {
+          case "default" =>
+            (None, None, None)
+          case "true" =>
+            numOfBytesUsed = 0
+            (None, None, Some(true))
+          case "false" =>
+            numOfBytesUsed = 0
+            (None, None, Some(false))
+          case "byte" =>
+            numOfBytesUsed = 1
+            val b = bytes(pos)
+            val value = if (b >= 0) Byte.MaxValue - b else Byte.MinValue - b - 
1
+            (Some(value.toLong), None, None)
+          case "short" =>
+            numOfBytesUsed = 2
+            val b = Bytes.toShort(bytes, pos, 2)
+            val value = if (b >= 0) Short.MaxValue - b else Short.MinValue - b 
- 1
+            (Some(value.toLong), None, None)
+          case "int" =>
+            numOfBytesUsed = 4
+            val b = Bytes.toInt(bytes, pos, 4)
+            val value = if (b >= 0) Int.MaxValue - b else Int.MinValue - b - 1
+            (Some(value.toLong), None, None)
+          case "long" =>
+            numOfBytesUsed = 8
+            val b = Bytes.toLong(bytes, pos, 8)
+            val value = if (b >= 0) Long.MaxValue - b else Long.MinValue - b - 
1
+            (Some(value.toLong), None, None)
+        }
+      case _ => // string
+        val strLen = header - stringLenOffset
+        numOfBytesUsed = strLen
+        (None, Some(Bytes.toString(bytes, pos, strLen)), None)
+    }
+
+    (InnerVal(longV, strV, boolV), numOfBytesUsed + 1)
+  }
+
+  def withLong(l: Long): InnerVal = {
+    //      if (l < 0) throw new IllegalDataRangeException("value shoudl be >= 
0")
+    InnerVal(Some(l), None, None)
+  }
+
+  def withStr(s: String): InnerVal = {
+    InnerVal(None, Some(s), None)
+  }
+
+  def withBoolean(b: Boolean): InnerVal = {
+    InnerVal(None, None, Some(b))
+  }
+
+  /**
+   * In natural order
+   * -129, -128 , -2, -1 < 0 < 1, 2, 127, 128
+   *
+   * In byte order
+   * 0 < 1, 2, 127, 128 < -129, -128, -2, -1
+   *
+   */
+  def transform(l: Long): (Byte, Array[Byte]) = {
+    if (Byte.MinValue <= l && l <= Byte.MaxValue) {
+      //        val value = if (l < 0) l - Byte.MinValue else l + Byte.MinValue
+      val key = if (l >= 0) metaByte("byte") else -metaByte("byte")
+      val value = if (l >= 0) Byte.MaxValue - l else Byte.MinValue - l - 1
+      val valueBytes = Array.fill(1)(value.toByte)
+      (key.toByte, valueBytes)
+    } else if (Short.MinValue <= l && l <= Short.MaxValue) {
+      val key = if (l >= 0) metaByte("short") else -metaByte("short")
+      val value = if (l >= 0) Short.MaxValue - l else Short.MinValue - l - 1
+      val valueBytes = Bytes.toBytes(value.toShort)
+      (key.toByte, valueBytes)
+    } else if (Int.MinValue <= l && l <= Int.MaxValue) {
+      val key = if (l >= 0) metaByte("int") else -metaByte("int")
+      val value = if (l >= 0) Int.MaxValue - l else Int.MinValue - l - 1
+      val valueBytes = Bytes.toBytes(value.toInt)
+      (key.toByte, valueBytes)
+    } else if (Long.MinValue <= l && l <= Long.MaxValue) {
+      val key = if (l >= 0) metaByte("long") else -metaByte("long")
+      val value = if (l >= 0) Long.MaxValue - l else Long.MinValue - l - 1
+      val valueBytes = Bytes.toBytes(value.toLong)
+      (key.toByte, valueBytes)
+    } else {
+      throw new Exception(s"InnerVal range is out: $l")
+    }
+  }
+}
+
+case class InnerVal(longV: Option[Long], strV: Option[String], boolV: 
Option[Boolean])
+  extends HBaseSerializable with InnerValLike {
+
+  import InnerVal._
+
+  lazy val isDefault = longV.isEmpty && strV.isEmpty && boolV.isEmpty
+  val value = (longV, strV, boolV) match {
+    case (Some(l), None, None) => l
+    case (None, Some(s), None) => s
+    case (None, None, Some(b)) => b
+    case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
+  }
+  def valueType = (longV, strV, boolV) match {
+    case (Some(l), None, None) => "long"
+    case (None, Some(s), None) => "string"
+    case (None, None, Some(b)) => "boolean"
+    case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
+  }
+
+  def compare(other: InnerValLike): Int = {
+    if (!other.isInstanceOf[InnerVal]) {
+      throw new RuntimeException(s"compare between $this vs $other is not 
supported")
+    } else {
+//      (value, other.value) match {
+//        case (v1: Long, v2: Long) => v1.compare(v2)
+//        case (b1: Boolean, b2: Boolean) => b1.compare(b2)
+//        case (s1: String, s2: String) => s1.compare(s2)
+//        case _ => throw new Exception("Please check a type of the compare 
operands")
+//      }
+      Bytes.compareTo(bytes, other.bytes) * -1
+    }
+  }
+
+  def +(other: InnerValLike) = {
+    if (!other.isInstanceOf[InnerVal]) {
+      throw new RuntimeException(s"+ between $this vs $other is not supported")
+    } else {
+      (value, other.value) match {
+        case (v1: Long, v2: Long) => InnerVal.withLong(v1 + v2)
+        case (b1: Boolean, b2: Boolean) => InnerVal.withBoolean(if (b2) !b1 
else b1)
+        case _ => throw new Exception("Please check a type of the incr 
operands")
+      }
+    }
+  }
+
+  def bytes = {
+    val (meta, valBytes) = (longV, strV, boolV) match {
+      case (None, None, None) =>
+        (metaByte("default"), Array.empty[Byte])
+      case (Some(l), None, None) =>
+        transform(l)
+      case (None, None, Some(b)) =>
+        val meta = if (b) metaByte("true") else metaByte("false")
+        (meta, Array.empty[Byte])
+      case (None, Some(s), None) =>
+        val sBytes = Bytes.toBytes(s)
+        if (sBytes.length > maxStringLen) {
+          throw new IllegalDataTypeException(s"string in innerVal maxSize is 
$maxStringLen, given ${sBytes.length}")
+        }
+        assert(sBytes.length <= maxStringLen)
+        val meta = (stringLenOffset + sBytes.length).toByte
+        (meta, sBytes)
+      case _ => throw new IllegalDataTypeException("innerVal data type should 
be [long/string/bool]")
+    }
+    Bytes.add(Array.fill(1)(meta.toByte), valBytes)
+  }
+
+  override def toString(): String = {
+    value.toString
+  }
+  override def hashKey(dataType: String): Int = {
+    value.toString.hashCode()
+  }
+  override def toIdString(): String = {
+    value.toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala
new file mode 100644
index 0000000..a511f17
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/v2/InnerVal.scala
@@ -0,0 +1,160 @@
+package org.apache.s2graph.core.types.v2
+
+import org.apache.hadoop.hbase.util._
+import org.apache.s2graph.core.types
+import org.apache.s2graph.core.types.{HBaseDeserializableWithIsVertexId, 
HBaseSerializable, HBaseType, InnerValLike}
+
+object InnerVal extends HBaseDeserializableWithIsVertexId {
+
+  import HBaseType._
+
+  val order = Order.DESCENDING
+
+  def fromBytes(bytes: Array[Byte],
+                offset: Int,
+                len: Int,
+                version: String = DEFAULT_VERSION,
+                isVertexId: Boolean = false): (InnerVal, Int) = {
+    val pbr = new SimplePositionedByteRange(bytes)
+    pbr.setPosition(offset)
+    val startPos = pbr.getPosition
+    if (bytes(offset) == -1 | bytes(offset) == 0) {
+      /** simple boolean */
+      val boolean = order match {
+        case Order.DESCENDING => bytes(offset) == 0
+        case _ => bytes(offset) == -1
+      }
+      (InnerVal(boolean), 1)
+    }
+    else {
+      if (OrderedBytes.isNumeric(pbr)) {
+        val numeric = OrderedBytes.decodeNumericAsBigDecimal(pbr)
+        if (isVertexId) (InnerVal(numeric.longValue()), pbr.getPosition - 
startPos)
+        else (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos)
+//        (InnerVal(numeric.doubleValue()), pbr.getPosition - startPos)
+//        (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos)
+      } else if (OrderedBytes.isText(pbr)) {
+        val str = OrderedBytes.decodeString(pbr)
+        (InnerVal(str), pbr.getPosition - startPos)
+      } else if (OrderedBytes.isBlobVar(pbr)) {
+        val blobVar = OrderedBytes.decodeBlobVar(pbr)
+        (InnerVal(blobVar), pbr.getPosition - startPos)
+      } else {
+        throw new RuntimeException("!!")
+      }
+    }
+  }
+}
+
+case class InnerVal(value: Any) extends HBaseSerializable with InnerValLike {
+
+  import types.InnerVal._
+
+  def bytes: Array[Byte] = {
+    val ret = value match {
+      case b: Boolean =>
+
+        /** since OrderedBytes header start from 0x05, it is safe to use -1, 0
+          * for decreasing order (true, false) */
+        //        Bytes.toBytes(b)
+        order match {
+          case Order.DESCENDING => if (b) Array(0.toByte) else Array(-1.toByte)
+          case _ => if (!b) Array(0.toByte) else Array(-1.toByte)
+        }
+      case d: Double =>
+        val num = BigDecimal(d)
+        val pbr = numByteRange(num)
+        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
+        pbr.getBytes().take(len)
+      case l: Long =>
+        val num = BigDecimal(l)
+        val pbr = numByteRange(num)
+        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
+        pbr.getBytes().take(len)
+      case i: Int =>
+        val num = BigDecimal(i)
+        val pbr = numByteRange(num)
+        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
+        pbr.getBytes().take(len)
+      case sh: Short =>
+        val num = BigDecimal(sh)
+        val pbr = numByteRange(num)
+        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
+        pbr.getBytes().take(len)
+      case b: Byte =>
+        val num = BigDecimal(b)
+        val pbr = numByteRange(num)
+        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
+        pbr.getBytes().take(len)
+
+
+      case b: BigDecimal =>
+        val pbr = numByteRange(b)
+        val len = OrderedBytes.encodeNumeric(pbr, b.bigDecimal, order)
+        pbr.getBytes().take(len)
+      case s: String =>
+        val pbr = new SimplePositionedMutableByteRange(s.getBytes.length + 3)
+        val len = OrderedBytes.encodeString(pbr, s, order)
+        pbr.getBytes().take(len)
+      case blob: Array[Byte] =>
+        val len = OrderedBytes.blobVarEncodedLength(blob.length)
+        val pbr = new SimplePositionedMutableByteRange(len)
+        val totalLen = OrderedBytes.encodeBlobVar(pbr, blob, order)
+        pbr.getBytes().take(totalLen)
+    }
+    //    println(s"$value => ${ret.toList}, ${ret.length}")
+    ret
+  }
+
+//
+  override def hashKey(dataType: String): Int = {
+    if (value.isInstanceOf[String]) {
+      // since we use dummy stringn value for degree edge.
+      value.toString.hashCode()
+    } else {
+      dataType match {
+        case BYTE => 
value.asInstanceOf[BigDecimal].bigDecimal.byteValue().hashCode()
+        case FLOAT => 
value.asInstanceOf[BigDecimal].bigDecimal.floatValue().hashCode()
+        case DOUBLE => 
value.asInstanceOf[BigDecimal].bigDecimal.doubleValue().hashCode()
+        case LONG => 
value.asInstanceOf[BigDecimal].bigDecimal.longValue().hashCode()
+        case INT => 
value.asInstanceOf[BigDecimal].bigDecimal.intValue().hashCode()
+        case SHORT => 
value.asInstanceOf[BigDecimal].bigDecimal.shortValue().hashCode()
+        case STRING => value.toString.hashCode
+        case _ => throw new RuntimeException(s"NotSupportede type: $dataType")
+      }
+    }
+  }
+
+  def compare(other: InnerValLike): Int = {
+    if (!other.isInstanceOf[InnerValLike])
+      throw new RuntimeException(s"compare $this vs $other")
+    Bytes.compareTo(bytes, other.bytes) * -1
+  }
+
+  def +(other: InnerValLike): InnerValLike = {
+    if (!other.isInstanceOf[InnerValLike])
+      throw new RuntimeException(s"+ $this, $other")
+
+    (value, other.value) match {
+      case (v1: BigDecimal, v2: BigDecimal) => new 
InnerVal(BigDecimal(v1.bigDecimal.add(v2.bigDecimal)))
+      case _ => throw new RuntimeException("+ operation on inner val is for 
big decimal pair")
+    }
+  }
+
+  //need to be removed ??
+  override def toString(): String = {
+//    value.toString()
+    value match {
+      case n: BigDecimal => n.bigDecimal.toPlainString
+      case _ => value.toString
+    }
+  }
+
+  override def toIdString(): String = {
+    value match {
+      case n: BigDecimal => n.bigDecimal.longValue().toString()
+      case _ => value.toString
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala
new file mode 100644
index 0000000..59452df
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala
@@ -0,0 +1,82 @@
+package org.apache.s2graph.core.utils
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.CacheBuilder
+import com.stumbleupon.async.Deferred
+import com.typesafe.config.Config
+
+import scala.concurrent.ExecutionContext
+
+class DeferCache[R](config: Config)(implicit ex: ExecutionContext) {
+
+  import Extensions.DeferOps
+
+  type Value = (Long, Deferred[R])
+
+  private val maxSize = config.getInt("future.cache.max.size")
+  private val expireAfterWrite = 
config.getInt("future.cache.expire.after.write")
+  private val expireAfterAccess = 
config.getInt("future.cache.expire.after.access")
+
+  private val futureCache = CacheBuilder.newBuilder()
+  .initialCapacity(maxSize)
+  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
+  .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
+  .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
+  .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]()
+
+
+  def asMap() = futureCache.asMap()
+
+  def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey)
+
+  private def checkAndExpire(cacheKey: Long,
+                             cachedAt: Long,
+                             cacheTTL: Long,
+                             oldDefer: Deferred[R])(op: => Deferred[R]): 
Deferred[R] = {
+    if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
+      // future is too old. so need to expire and fetch new data from storage.
+      futureCache.asMap().remove(cacheKey)
+
+      val newPromise = new Deferred[R]()
+      val now = System.currentTimeMillis()
+
+      futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
+        case null =>
+          // only one thread succeed to come here concurrently
+          // initiate fetch to storage then add callback on complete to finish 
promise.
+          op withCallback { value =>
+            newPromise.callback(value)
+            value
+          }
+          newPromise
+        case (cachedAt, oldDefer) => oldDefer
+      }
+    } else {
+      // future is not to old so reuse it.
+      oldDefer
+    }
+  }
+  def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): 
Deferred[R] = {
+    val cacheVal = futureCache.getIfPresent(cacheKey)
+    cacheVal match {
+      case null =>
+        val promise = new Deferred[R]()
+        val now = System.currentTimeMillis()
+        val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, 
(now, promise)) match {
+          case null =>
+            op.withCallback { value =>
+              promise.callback(value)
+              value
+            }
+            (now, promise)
+          case oldVal => oldVal
+        }
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
+
+      case (cachedAt, defer) =>
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala
new file mode 100644
index 0000000..5805c32
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Extentions.scala
@@ -0,0 +1,73 @@
+package org.apache.s2graph.core.utils
+
+import com.stumbleupon.async.{Callback, Deferred}
+import com.typesafe.config.Config
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+object Extensions {
+
+
+  def retryOnSuccess[T](maxRetryNum: Int, n: Int = 1)(fn: => 
Future[T])(shouldStop: T => Boolean)(implicit ex: ExecutionContext): Future[T] 
= n match {
+    case i if n <= maxRetryNum =>
+      fn.flatMap { result =>
+        if (!shouldStop(result)) {
+          logger.info(s"retryOnSuccess $n")
+          retryOnSuccess(maxRetryNum, n + 1)(fn)(shouldStop)
+        } else {
+          Future.successful(result)
+        }
+      }
+    case _ => fn
+  }
+
+  def retryOnFailure[T](maxRetryNum: Int, n: Int = 1)(fn: => 
Future[T])(fallback: => T)(implicit ex: ExecutionContext): Future[T] = n match {
+    case i if n <= maxRetryNum =>
+      fn recoverWith { case t: Throwable =>
+        logger.info(s"retryOnFailure $n $t")
+        retryOnFailure(maxRetryNum, n + 1)(fn)(fallback)
+      }
+    case _ =>
+      Future.successful(fallback)
+  }
+
+
+  implicit class DeferOps[T](d: Deferred[T])(implicit ex: ExecutionContext) {
+
+    def withCallback[R](op: T => R): Deferred[R] = {
+      d.addCallback(new Callback[R, T] {
+        override def call(arg: T): R = op(arg)
+      })
+    }
+
+    def recoverWith(op: Exception => T): Deferred[T] = {
+      d.addErrback(new Callback[Deferred[T], Exception] {
+        override def call(e: Exception): Deferred[T] = 
Deferred.fromResult(op(e))
+      })
+    }
+
+
+    def toFuture: Future[T] = {
+      val promise = Promise[T]
+
+      d.addBoth(new Callback[Unit, T] {
+        def call(arg: T) = arg match {
+          case e: Exception => promise.failure(e)
+          case _ => promise.success(arg)
+        }
+      })
+
+      promise.future
+    }
+
+    def toFutureWith(fallback: => T): Future[T] = {
+      toFuture recoverWith { case t: Throwable => Future.successful(fallback) }
+    }
+
+  }
+
+  implicit class ConfigOps(config: Config) {
+    def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean =
+      if (config.hasPath(key)) config.getBoolean(key) else defaultValue
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala
new file mode 100644
index 0000000..b23566e
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala
@@ -0,0 +1,82 @@
+package org.apache.s2graph.core.utils
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.cache.CacheBuilder
+import com.typesafe.config.Config
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+
+class FutureCache[R](config: Config)(implicit ex: ExecutionContext) {
+
+  type Value = (Long, Future[R])
+
+  private val maxSize = config.getInt("future.cache.max.size")
+  private val expireAfterWrite = 
config.getInt("future.cache.expire.after.write")
+  private val expireAfterAccess = 
config.getInt("future.cache.expire.after.access")
+
+  private val futureCache = CacheBuilder.newBuilder()
+  .initialCapacity(maxSize)
+  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
+  .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
+  .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
+  .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]()
+
+
+  def asMap() = futureCache.asMap()
+
+  def getIfPresent(cacheKey: Long): Value = {
+    val (cachedAt, promise) = futureCache.getIfPresent(cacheKey)
+    (cachedAt, promise.future)
+  }
+
+  private def checkAndExpire(cacheKey: Long,
+                             cachedAt: Long,
+                             cacheTTL: Long,
+                             oldFuture: Future[R])(op: => Future[R]): 
Future[R] = {
+    if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
+      // future is too old. so need to expire and fetch new data from storage.
+      futureCache.asMap().remove(cacheKey)
+
+      val newPromise = Promise[R]
+      val now = System.currentTimeMillis()
+
+      futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
+        case null =>
+          // only one thread succeed to come here concurrently
+          // initiate fetch to storage then add callback on complete to finish 
promise.
+          op.onSuccess { case value =>
+            newPromise.success(value)
+            value
+          }
+          newPromise.future
+        case (cachedAt, oldPromise) => oldPromise.future
+      }
+    } else {
+      // future is not to old so reuse it.
+      oldFuture
+    }
+  }
+  def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): 
Future[R] = {
+    val cacheVal = futureCache.getIfPresent(cacheKey)
+    cacheVal match {
+      case null =>
+        val promise = Promise[R]
+        val now = System.currentTimeMillis()
+        val (cachedAt, cachedPromise) = 
futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match {
+          case null =>
+            op.onSuccess { case value =>
+              promise.success(value)
+              value
+            }
+            (now, promise)
+          case oldVal => oldVal
+        }
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
+
+      case (cachedAt, cachedPromise) =>
+        checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
new file mode 100644
index 0000000..6b02a00
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -0,0 +1,44 @@
+package org.apache.s2graph.core.utils
+
+import org.slf4j.LoggerFactory
+import play.api.libs.json.JsValue
+
+import scala.language.{higherKinds, implicitConversions}
+
+object logger {
+
+  trait Loggable[T] {
+    def toLogMessage(msg: T): String
+  }
+
+  object Loggable {
+    implicit val stringLoggable = new Loggable[String] {
+      def toLogMessage(msg: String) = msg
+    }
+
+    implicit def numericLoggable[T: Numeric] = new Loggable[T] {
+      def toLogMessage(msg: T) = msg.toString
+    }
+
+    implicit val jsonLoggable = new Loggable[JsValue] {
+      def toLogMessage(msg: JsValue) = msg.toString()
+    }
+
+    implicit val booleanLoggable = new Loggable[Boolean] {
+      def toLogMessage(msg: Boolean) = msg.toString()
+    }
+  }
+
+  private val logger = LoggerFactory.getLogger("application")
+  private val errorLogger = LoggerFactory.getLogger("error")
+
+  def info[T: Loggable](msg: => T) = 
logger.info(implicitly[Loggable[T]].toLogMessage(msg))
+
+  def debug[T: Loggable](msg: => T) = 
logger.debug(implicitly[Loggable[T]].toLogMessage(msg))
+
+  def error[T: Loggable](msg: => T, exception: => Throwable) = 
errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
+
+  def error[T: Loggable](msg: => T) = 
errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg))
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
new file mode 100644
index 0000000..3c67e42
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -0,0 +1,62 @@
+package org.apache.s2graph.core.utils
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.common.cache.CacheBuilder
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success}
+
+object SafeUpdateCache {
+
+  case class CacheKey(key: String)
+
+}
+
+class SafeUpdateCache[T](prefix: String, maxSize: Int, ttl: Int)(implicit 
executionContext: ExecutionContext) {
+
+  import SafeUpdateCache._
+
+  implicit class StringOps(key: String) {
+    def toCacheKey = new CacheKey(prefix + ":" + key)
+  }
+
+  def toTs() = (System.currentTimeMillis() / 1000).toInt
+
+  private val cache = 
CacheBuilder.newBuilder().maximumSize(maxSize).build[CacheKey, (T, Int, 
AtomicBoolean)]()
+
+  def put(key: String, value: T) = cache.put(key.toCacheKey, (value, toTs, new 
AtomicBoolean(false)))
+
+  def invalidate(key: String) = cache.invalidate(key.toCacheKey)
+
+  def withCache(key: String)(op: => T): T = {
+    val cacheKey = key.toCacheKey
+    val cachedValWithTs = cache.getIfPresent(cacheKey)
+
+    if (cachedValWithTs == null) {
+      // fetch and update cache.
+      val newValue = op
+      cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false)))
+      newValue
+    } else {
+      val (cachedVal, updatedAt, isUpdating) = cachedValWithTs
+      if (toTs() < updatedAt + ttl) cachedVal // in cache TTL
+      else {
+        val running = isUpdating.getAndSet(true)
+        if (running) cachedVal
+        else {
+          Future(op)(executionContext) onComplete {
+            case Failure(ex) =>
+              cache.put(cacheKey, (cachedVal, toTs(), new 
AtomicBoolean(false))) // keep old value
+              logger.error(s"withCache update failed: $cacheKey")
+            case Success(newValue) =>
+              cache.put(cacheKey, (newValue, toTs(), new 
AtomicBoolean(false))) // update new value
+              logger.info(s"withCache update success: $cacheKey")
+          }
+          cachedVal
+        }
+      }
+    }
+  }
+}
+

Reply via email to