http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
deleted file mode 100644
index 46ad15f..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.indexedge.tall
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
-import com.kakao.s2graph.core.types.VertexId
-import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-class IndexEdgeSerializable(indexEdge: IndexEdge) extends 
Serializable[IndexEdge] {
-   import StorageSerializable._
-
-   val label = indexEdge.label
-   val table = label.hbaseTableName.getBytes()
-   val cf = Serializable.edgeCf
-
-   val idxPropsMap = indexEdge.orders.toMap
-   val idxPropsBytes = propsToBytes(indexEdge.orders)
-
-   override def toKeyValues: Seq[SKeyValue] = {
-     val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-     val labelWithDirBytes = indexEdge.labelWithDir.bytes
-     val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-
-     val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-     //    
logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
-
-     val qualifier =
-       if (indexEdge.degreeEdge) Array.empty[Byte]
-       else
-         idxPropsMap.get(LabelMeta.toSeq) match {
-           case None => Bytes.add(idxPropsBytes, 
VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes)
-           case Some(vId) => idxPropsBytes
-         }
-
-     /** TODO search usage of op byte. if there is no, then remove opByte */
-     val rowBytes = Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), 
qualifier)
-     //    val qualifierBytes = Array.fill(1)(indexEdge.op)
-     val qualifierBytes = Array.empty[Byte]
-
-     val value =
-       if (indexEdge.degreeEdge)
-         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
-       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
-         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
-       else propsToKeyValues(indexEdge.metas.toSeq)
-
-     val kv = SKeyValue(table, rowBytes, cf, qualifierBytes, value, 
indexEdge.version)
-
-     //        logger.debug(s"[Ser]: ${kv.row.toList}, ${kv.qualifier.toList}, 
${kv.value.toList}")
-     Seq(kv)
-   }
- }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
deleted file mode 100644
index f83dd1f..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.indexedge.wide
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.StorageDeserializable._
-import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, 
SKeyValue, StorageDeserializable}
-import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
-
-class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = 
bytesToLong) extends Deserializable[IndexEdge] {
-   import StorageDeserializable._
-
-   type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, 
Int)
-   type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
-   private def parseDegreeQualifier(kv: SKeyValue, version: String): 
QualifierRaw = {
-     //    val degree = Bytes.toLong(kv.value)
-     val degree = bytesToLongFunc(kv.value, 0)
-     val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, 
version))
-     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, 
InnerVal.withStr("0", version))
-     (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
-   }
-
-   private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
-     var qualifierLen = 0
-     var pos = 0
-     val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
-       val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
-       pos = endAt
-       qualifierLen += endAt
-       val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
-         (HBaseType.defaultTgtVertexId, 0)
-       } else {
-         TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, 
version)
-       }
-       qualifierLen += tgtVertexIdLen
-       (props, endAt, tgtVertexId, tgtVertexIdLen)
-     }
-     val (op, opLen) =
-       if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
-       else (kv.qualifier(qualifierLen), 1)
-
-     qualifierLen += opLen
-
-     (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
-   }
-
-   private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
-     val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, 
version)
-     (props, endAt)
-   }
-
-   private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
-     (Array.empty[(Byte, InnerValLike)], 0)
-   }
-
-   override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                    _kvs: Seq[T],
-                                                    version: String,
-                                                    cacheElementOpt: 
Option[IndexEdge]): IndexEdge = {
-     assert(_kvs.size == 1)
-
-     val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-     val kv = kvs.head
-     val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map 
{ e =>
-       (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
-     }.getOrElse(parseRow(kv, version))
-
-     val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
-       if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, version)
-       else parseQualifier(kv, version)
-
-     val (props, _) = if (op == GraphUtil.operations("incrementCount")) {
-       //      val countVal = Bytes.toLong(kv.value)
-       val countVal = bytesToLongFunc(kv.value, 0)
-       val dummyProps = Array(LabelMeta.countSeq -> 
InnerVal.withLong(countVal, version))
-       (dummyProps, 8)
-     } else if (kv.qualifier.isEmpty) {
-       parseDegreeValue(kv, version)
-     } else {
-       parseValue(kv, version)
-     }
-
-     val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new 
RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, 
${labelIdxSeq}"))
-
-
-     //    assert(kv.qualifier.nonEmpty && index.metaSeqs.size == 
idxPropsRaw.size)
-
-     val idxProps = for {
-       (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
-     } yield {
-         if (k == LabelMeta.degreeSeq) k -> v
-         else seq -> v
-       }
-
-     val idxPropsMap = idxProps.toMap
-     val tgtVertexId = if (tgtVertexIdInQualifier) {
-       idxPropsMap.get(LabelMeta.toSeq) match {
-         case None => tgtVertexIdRaw
-         case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId)
-       }
-     } else tgtVertexIdRaw
-
-     val _mergedProps = (idxProps ++ props).toMap
-     val mergedProps =
-       if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
-       else _mergedProps + (LabelMeta.timeStampSeq -> 
InnerVal.withLong(kv.timestamp, version))
-
-     //    logger.error(s"$mergedProps")
-     //    val ts = mergedProps(LabelMeta.timeStampSeq).toString().toLong
-
-     val ts = kv.timestamp
-     IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, 
op, ts, labelIdxSeq, mergedProps)
-
-   }
- }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
deleted file mode 100644
index 716b6fb..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.indexedge.wide
-
-import com.kakao.s2graph.core.mysqls.LabelMeta
-import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
-import com.kakao.s2graph.core.types.VertexId
-import com.kakao.s2graph.core.{GraphUtil, IndexEdge}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-class IndexEdgeSerializable(indexEdge: IndexEdge) extends 
Serializable[IndexEdge] {
-   import StorageSerializable._
-
-   val label = indexEdge.label
-   val table = label.hbaseTableName.getBytes()
-   val cf = Serializable.edgeCf
-
-   val idxPropsMap = indexEdge.orders.toMap
-   val idxPropsBytes = propsToBytes(indexEdge.orders)
-
-   override def toKeyValues: Seq[SKeyValue] = {
-     val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
-     val labelWithDirBytes = indexEdge.labelWithDir.bytes
-     val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-
-     val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-     //    
logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
-     val tgtIdBytes = VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes
-     val qualifier =
-       if (indexEdge.degreeEdge) Array.empty[Byte]
-       else {
-         if (indexEdge.op == GraphUtil.operations("incrementCount")) {
-           Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
-         } else {
-           idxPropsMap.get(LabelMeta.toSeq) match {
-             case None => Bytes.add(idxPropsBytes, tgtIdBytes)
-             case Some(vId) => idxPropsBytes
-           }
-         }
-       }
-
-
-     val value =
-       if (indexEdge.degreeEdge)
-         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.degreeSeq).innerVal.toString().toLong)
-       else if (indexEdge.op == GraphUtil.operations("incrementCount"))
-         
Bytes.toBytes(indexEdge.propsWithTs(LabelMeta.countSeq).innerVal.toString().toLong)
-       else propsToKeyValues(indexEdge.metas.toSeq)
-
-     val kv = SKeyValue(table, row, cf, qualifier, value, indexEdge.version)
-
-     Seq(kv)
-   }
- }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
deleted file mode 100644
index c97bed6..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.snapshotedge.tall
-
-import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
-import com.kakao.s2graph.core.storage.StorageDeserializable._
-import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue}
-import com.kakao.s2graph.core.types.{HBaseType, LabelWithDirection, 
SourceAndTargetVertexIdPair, SourceVertexId}
-import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
-
-  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
-    val statusCode = byte >> 4
-    val op = byte & ((1 << 4) - 1)
-    (statusCode.toByte, op.toByte)
-  }
-
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                   _kvs: Seq[T],
-                                                   version: String,
-                                                   cacheElementOpt: 
Option[SnapshotEdge]): SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
-
-    val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
-    val cellVersion = kv.timestamp
-    /** rowKey */
-    def parseRowV3(kv: SKeyValue, version: String) = {
-      var pos = 0
-      val (srcIdAndTgtId, srcIdAndTgtIdLen) = 
SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
-      pos += srcIdAndTgtIdLen
-      val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-      pos += 4
-      val (labelIdxSeq, isInverted) = 
bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-
-      val rowLen = srcIdAndTgtIdLen + 4 + 1
-      (srcIdAndTgtId.srcInnerId, srcIdAndTgtId.tgtInnerId, labelWithDir, 
labelIdxSeq, isInverted, rowLen)
-
-    }
-    val (srcInnerId, tgtInnerId, labelWithDir, _, _, _) = cacheElementOpt.map 
{ e =>
-      (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, 
LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRowV3(kv, schemaVer))
-
-    val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
-    val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
-
-    val (props, op, ts, statusCode, _pendingEdgeOpt) = {
-      var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
-      pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-      val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
-          val pendingEdge =
-            Edge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
-          Option(pendingEdge)
-        }
-
-      (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
-    }
-
-    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
deleted file mode 100644
index a507b90..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.snapshotedge.tall
-
-import com.kakao.s2graph.core.SnapshotEdge
-import com.kakao.s2graph.core.mysqls.LabelIndex
-import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
-import com.kakao.s2graph.core.types.SourceAndTargetVertexIdPair
-import org.apache.hadoop.hbase.util.Bytes
-
-
-class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends 
Serializable[SnapshotEdge] {
-  import StorageSerializable._
-
-  val label = snapshotEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = Serializable.edgeCf
-
-  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
-    val byte = (((statusCode << 4) | op).toByte)
-    Array.fill(1)(byte.toByte)
-  }
-  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, 
snapshotEdge.op),
-    propsToKeyValuesWithTs(snapshotEdge.props.toList))
-
-  override def toKeyValues: Seq[SKeyValue] = {
-    val srcIdAndTgtIdBytes = 
SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, 
snapshotEdge.tgtVertex.innerId).bytes
-    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
-    val row = Bytes.add(srcIdAndTgtIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-
-    val qualifier = Array.empty[Byte]
-
-    val value = snapshotEdge.pendingEdgeOpt match {
-      case None => valueBytes()
-      case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
-        val versionBytes = Array.empty[Byte]
-        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
-        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
-
-        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))
-    }
-
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
deleted file mode 100644
index 1174f50..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.snapshotedge.wide
-
-import com.kakao.s2graph.core.mysqls.{LabelIndex, LabelMeta}
-import com.kakao.s2graph.core.storage.StorageDeserializable._
-import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable}
-import com.kakao.s2graph.core.types.TargetVertexId
-import com.kakao.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-
-class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
-
-  def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
-    val statusCode = byte >> 4
-    val op = byte & ((1 << 4) - 1)
-    (statusCode.toByte, op.toByte)
-  }
-
-  override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                                   _kvs: Seq[T],
-                                                   version: String,
-                                                   cacheElementOpt: 
Option[SnapshotEdge]): SnapshotEdge = {
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-    assert(kvs.size == 1)
-
-    val kv = kvs.head
-    val schemaVer = queryParam.label.schemaVersion
-    val cellVersion = kv.timestamp
-
-    val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
-      (e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
-    }.getOrElse(parseRow(kv, schemaVer))
-
-    val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
-      val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, 
kv.qualifier.length, schemaVer)
-      var pos = 0
-      val (statusCode, op) = statusCodeWithOp(kv.value(pos))
-      pos += 1
-      val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
-      val kvsMap = props.toMap
-      val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
-
-      pos = endAt
-      val _pendingEdgeOpt =
-        if (pos == kv.value.length) None
-        else {
-          val (pendingEdgeStatusCode, pendingEdgeOp) = 
statusCodeWithOp(kv.value(pos))
-          pos += 1
-          //          val versionNum = Bytes.toLong(kv.value, pos, 8)
-          //          pos += 8
-          val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, 
pos, schemaVer)
-          pos = endAt
-          val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
-
-          val pendingEdge =
-            Edge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
-              labelWithDir, pendingEdgeOp,
-              cellVersion, pendingEdgeProps.toMap,
-              statusCode = pendingEdgeStatusCode, lockTs = lockTs)
-          Option(pendingEdge)
-        }
-
-      (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
-    }
-
-    SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
-      labelWithDir, op, cellVersion, props, statusCode = statusCode,
-      pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
deleted file mode 100644
index e6074d9..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.snapshotedge.wide
-
-import com.kakao.s2graph.core.SnapshotEdge
-import com.kakao.s2graph.core.mysqls.LabelIndex
-import com.kakao.s2graph.core.storage.{SKeyValue, Serializable, 
StorageSerializable}
-import com.kakao.s2graph.core.types.VertexId
-import org.apache.hadoop.hbase.util.Bytes
-
-
-/**
- * this class serialize
- * @param snapshotEdge
- */
-class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends 
Serializable[SnapshotEdge] {
-  import StorageSerializable._
-
-  val label = snapshotEdge.label
-  val table = label.hbaseTableName.getBytes()
-  val cf = Serializable.edgeCf
-
-  def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
-    val byte = (((statusCode << 4) | op).toByte)
-    Array.fill(1)(byte.toByte)
-  }
-  def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, 
snapshotEdge.op),
-    propsToKeyValuesWithTs(snapshotEdge.props.toList))
-
-  override def toKeyValues: Seq[SKeyValue] = {
-    val srcIdBytes = VertexId.toSourceVertexId(snapshotEdge.srcVertex.id).bytes
-    val labelWithDirBytes = snapshotEdge.labelWithDir.bytes
-    val labelIndexSeqWithIsInvertedBytes = 
labelOrderSeqWithIsInverted(LabelIndex.DefaultSeq, isInverted = true)
-
-    val row = Bytes.add(srcIdBytes, labelWithDirBytes, 
labelIndexSeqWithIsInvertedBytes)
-    val tgtIdBytes = VertexId.toTargetVertexId(snapshotEdge.tgtVertex.id).bytes
-
-    val qualifier = tgtIdBytes
-
-    val value = snapshotEdge.pendingEdgeOpt match {
-      case None => valueBytes()
-      case Some(pendingEdge) =>
-        val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
-        val versionBytes = Array.empty[Byte]
-        val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
-        val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
-        Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), 
Bytes.add(propsBytes, lockBytes))
-    }
-    val kv = SKeyValue(table, row, cf, qualifier, value, snapshotEdge.version)
-    Seq(kv)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
deleted file mode 100644
index e355401..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.vertex
-
-import com.kakao.s2graph.core.storage.{CanSKeyValue, Deserializable}
-import com.kakao.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
-import com.kakao.s2graph.core.{QueryParam, Vertex}
-import org.apache.hadoop.hbase.util.Bytes
-import scala.collection.mutable.ListBuffer
-
-class VertexDeserializable extends Deserializable[Vertex] {
-  def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
-                                     _kvs: Seq[T],
-                                     version: String,
-                                     cacheElementOpt: Option[Vertex]): Vertex 
= {
-
-    val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
-    val kv = kvs.head
-    val (vertexId, _) = VertexId.fromBytes(kv.row, 0, kv.row.length, version)
-
-    var maxTs = Long.MinValue
-    val propsMap = new collection.mutable.HashMap[Int, InnerValLike]
-    val belongLabelIds = new ListBuffer[Int]
-
-    for {
-      kv <- kvs
-    } {
-      val propKey =
-        if (kv.qualifier.length == 1) kv.qualifier.head.toInt
-        else Bytes.toInt(kv.qualifier)
-
-      val ts = kv.timestamp
-      if (ts > maxTs) maxTs = ts
-
-      if (Vertex.isLabelId(propKey)) {
-        belongLabelIds += Vertex.toLabelId(propKey)
-      } else {
-        val v = kv.value
-        val (value, _) = InnerVal.fromBytes(v, 0, v.length, version)
-        propsMap += (propKey -> value)
-      }
-    }
-    assert(maxTs != Long.MinValue)
-    Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
deleted file mode 100644
index 0c17592..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/serde/vertex/VertexSerializable.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.kakao.s2graph.core.storage.serde.vertex
-
-import com.kakao.s2graph.core.Vertex
-import com.kakao.s2graph.core.storage.{SKeyValue, Serializable}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
-
-  val cf = Serializable.vertexCf
-
-  override def toKeyValues: Seq[SKeyValue] = {
-    val row = vertex.id.bytes
-    val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield 
Bytes.toBytes(k) -> v.bytes
-    val belongsTo = vertex.belongLabelIds.map { labelId => 
Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
-    (base ++ belongsTo).map { case (qualifier, value) =>
-      SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, 
vertex.ts)
-    } toSeq
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala
deleted file mode 100644
index 4b3b3db..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/types/HBaseType.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-package com.kakao.s2graph.core.types
-
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Created by shon on 6/6/15.
- */
-object HBaseType {
-  val VERSION4 = "v4"
-  val VERSION3 = "v3"
-  val VERSION2 = "v2"
-  val VERSION1 = "v1"
-//  val DEFAULT_VERSION = VERSION2
-  val DEFAULT_VERSION = VERSION3
-  val EMPTY_SEQ_BYTE = Byte.MaxValue
-  val DEFAULT_COL_ID = 0
-  val bitsForDir = 2
-  val maxBytes = Bytes.toBytes(Int.MaxValue)
-  val toSeqByte = -5.toByte
-  val defaultTgtVertexId = null
-}
-
-object HBaseDeserializable {
-
-  import HBaseType._
-
-  // 6 bits is used for index sequence so total index per label is limited to 
2^6
-  def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): 
(Byte, Boolean) = {
-    val byte = bytes(offset)
-    val isInverted = if ((byte & 1) != 0) true else false
-    val labelOrderSeq = byte >> 1
-    (labelOrderSeq.toByte, isInverted)
-  }
-
-  def bytesToKeyValues(bytes: Array[Byte],
-                       offset: Int,
-                       length: Int,
-                       version: String = DEFAULT_VERSION): (Array[(Byte, 
InnerValLike)], Int) = {
-    var pos = offset
-    val len = bytes(pos)
-    pos += 1
-    val kvs = new Array[(Byte, InnerValLike)](len)
-    var i = 0
-    while (i < len) {
-      val k = bytes(pos)
-      pos += 1
-      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
-      pos += numOfBytesUsed
-      kvs(i) = (k -> v)
-      i += 1
-    }
-    val ret = (kvs, pos)
-    //    logger.debug(s"bytesToProps: $ret")
-    ret
-  }
-
-  def bytesToKeyValuesWithTs(bytes: Array[Byte],
-                             offset: Int,
-                             version: String = DEFAULT_VERSION): (Array[(Byte, 
InnerValLikeWithTs)], Int) = {
-    var pos = offset
-    val len = bytes(pos)
-    pos += 1
-    val kvs = new Array[(Byte, InnerValLikeWithTs)](len)
-    var i = 0
-    while (i < len) {
-      val k = bytes(pos)
-      pos += 1
-      val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, 
version)
-      pos += numOfBytesUsed
-      kvs(i) = (k -> v)
-      i += 1
-    }
-    val ret = (kvs, pos)
-    //    logger.debug(s"bytesToProps: $ret")
-    ret
-  }
-
-  def bytesToProps(bytes: Array[Byte],
-                   offset: Int,
-                   version: String = DEFAULT_VERSION): (Array[(Byte, 
InnerValLike)], Int) = {
-    var pos = offset
-    val len = bytes(pos)
-    pos += 1
-    val kvs = new Array[(Byte, InnerValLike)](len)
-    var i = 0
-    while (i < len) {
-      val k = EMPTY_SEQ_BYTE
-      val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
-      pos += numOfBytesUsed
-      kvs(i) = (k -> v)
-      i += 1
-    }
-    //    logger.error(s"bytesToProps: $kvs")
-    val ret = (kvs, pos)
-
-    ret
-  }
-}
-
-object HBaseSerializable {
-  def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
-    val len = props.length
-    assert(len < Byte.MaxValue)
-    var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes)
-    bytes
-  }
-
-  def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
-    val len = props.length
-    assert(len < Byte.MaxValue)
-    var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
-    bytes
-  }
-
-  def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): 
Array[Byte] = {
-    val len = props.length
-    assert(len < Byte.MaxValue)
-    var bytes = Array.fill(1)(len.toByte)
-    for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
-    bytes
-  }
-
-  def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): 
Array[Byte] = {
-    assert(labelOrderSeq < (1 << 6))
-    val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
-    Array.fill(1)(byte.toByte)
-  }
-}
-
-trait HBaseSerializable {
-  def bytes: Array[Byte]
-}
-
-trait HBaseDeserializable {
-
-  import HBaseType._
-
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION): (HBaseSerializable, Int)
-
-  //  def fromBytesWithIndex(bytes: Array[Byte],
-  //                offset: Int,
-  //                len: Int,
-  //                version: String = DEFAULT_VERSION): (HBaseSerializable, 
Int)
-  def notSupportedEx(version: String) = new RuntimeException(s"not supported 
version, $version")
-}
-
-trait HBaseDeserializableWithIsVertexId {
-
-  import HBaseType._
-
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION,
-                isVertexId: Boolean = false): (HBaseSerializable, Int)
-
-  def notSupportedEx(version: String) = new RuntimeException(s"not supported 
version, $version")
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala
deleted file mode 100644
index 8146f32..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/types/InnerValLike.scala
+++ /dev/null
@@ -1,246 +0,0 @@
-package com.kakao.s2graph.core.types
-
-import org.apache.hadoop.hbase.util._
-
-/**
- * Created by shon on 6/6/15.
- */
-object InnerVal extends HBaseDeserializableWithIsVertexId {
-  import HBaseType._
-
-  val order = Order.DESCENDING
-  val stringLenOffset = 7.toByte
-  val maxStringLen = Byte.MaxValue - stringLenOffset
-  val maxMetaByte = Byte.MaxValue
-  val minMetaByte = 0.toByte
-
-  /** supported data type */
-  val BLOB = "blob"
-  val STRING = "string"
-  val DOUBLE = "double"
-  val FLOAT = "float"
-  val LONG = "long"
-  val INT = "integer"
-  val SHORT = "short"
-  val BYTE = "byte"
-  val NUMERICS = List(DOUBLE, FLOAT, LONG, INT, SHORT, BYTE)
-  val BOOLEAN = "boolean"
-
-  def isNumericType(dataType: String): Boolean = {
-    dataType match {
-      case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | 
InnerVal.FLOAT | InnerVal.DOUBLE => true
-      case _ => false
-    }
-  }
-  def toInnerDataType(dataType: String): String = {
-    dataType match {
-      case "blob" => BLOB
-      case "string" | "str" | "s" => STRING
-      case "double" | "d" | "float64" => DOUBLE
-      case "float" | "f" | "float32" => FLOAT
-      case "long" | "l" | "int64" | "integer64" => LONG
-      case "int" | "integer" | "i" | "int32" | "integer32" => INT
-      case "short" | "int16" | "integer16" => SHORT
-      case "byte" | "b" | "tinyint" | "int8" | "integer8" => BYTE
-      case "boolean" | "bool" => BOOLEAN
-      case _ => throw new RuntimeException(s"can`t convert $dataType into 
InnerDataType")
-    }
-  }
-
-  def numByteRange(num: BigDecimal) = {
-//    val byteLen =
-//      if (num.isValidByte | num.isValidChar) 1
-//      else if (num.isValidShort) 2
-//      else if (num.isValidInt) 4
-//      else if (num.isValidLong) 8
-//      else if (num.isValidFloat) 4
-//      else 12
-    val byteLen = 12
-    //      else throw new RuntimeException(s"wrong data $num")
-    new SimplePositionedMutableByteRange(byteLen + 4)
-  }
-
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String,
-                isVertexId: Boolean): (InnerValLike, Int) = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal.fromBytes(bytes, 
offset, len, version, isVertexId)
-      case VERSION1 => v1.InnerVal.fromBytes(bytes, offset, len, version, 
isVertexId)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withLong(l: Long, version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(l))
-      case VERSION1 => v1.InnerVal(Some(l), None, None)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withInt(i: Int, version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(i))
-      case VERSION1 => v1.InnerVal(Some(i.toLong), None, None)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withFloat(f: Float, version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => 
v2.InnerVal(BigDecimal(f.toDouble))
-      case VERSION1 => v1.InnerVal(Some(f.toLong), None, None)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withDouble(d: Double, version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(BigDecimal(d))
-      case VERSION1 => v1.InnerVal(Some(d.toLong), None, None)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withNumber(num: BigDecimal, version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(num)
-      case VERSION1 => v1.InnerVal(Some(num.toLong), None, None)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withBoolean(b: Boolean, version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(b)
-      case VERSION1 => v1.InnerVal(None, None, Some(b))
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withBlob(blob: Array[Byte], version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(blob)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-  def withStr(s: String, version: String): InnerValLike = {
-    version match {
-      case VERSION2 | VERSION3 | VERSION4 => v2.InnerVal(s)
-      case VERSION1 => v1.InnerVal(None, Some(s), None)
-      case _ => throw notSupportedEx(version)
-    }
-  }
-
-//  def withInnerVal(innerVal: InnerValLike, version: String): InnerValLike = {
-//    val bytes = innerVal.bytes
-//    version match {
-//      case VERSION2 => v2.InnerVal.fromBytes(bytes, 0, bytes.length, 
version)._1
-//      case VERSION1 => v1.InnerVal.fromBytes(bytes, 0, bytes.length, 
version)._1
-//      case _ => throw notSupportedEx(version)
-//    }
-//  }
-
-  /** nasty implementation for backward compatability */
-  def convertVersion(innerVal: InnerValLike, dataType: String, toVersion: 
String): InnerValLike = {
-    val ret = toVersion match {
-      case VERSION2 | VERSION3 | VERSION4 =>
-        if (innerVal.isInstanceOf[v1.InnerVal]) {
-          val obj = innerVal.asInstanceOf[v1.InnerVal]
-          obj.valueType match {
-            case "long" => InnerVal.withLong(obj.longV.get, toVersion)
-            case "string" => InnerVal.withStr(obj.strV.get, toVersion)
-            case "boolean" => InnerVal.withBoolean(obj.boolV.get, toVersion)
-            case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
-          }
-        } else {
-          innerVal
-        }
-      case VERSION1 =>
-        if (innerVal.isInstanceOf[v2.InnerVal]) {
-          val obj = innerVal.asInstanceOf[v2.InnerVal]
-          obj.value match {
-            case str: String => InnerVal.withStr(str, toVersion)
-            case b: Boolean => InnerVal.withBoolean(b, toVersion)
-            case n: BigDecimal => InnerVal.withNumber(n, toVersion)
-            case n: Long => InnerVal.withNumber(n, toVersion)
-            case n: Double => InnerVal.withNumber(n, toVersion)
-            case n: Int => InnerVal.withNumber(n, toVersion)
-            case _ => throw notSupportedEx(s"v2 to v1: $obj -> $toVersion")
-          }
-        } else {
-          innerVal
-        }
-      case _ => throw notSupportedEx(toVersion)
-    }
-//    logger.debug(s"convertVersion: $innerVal, $dataType, $toVersion, $ret, 
${innerVal.bytes.toList}, ${ret.bytes.toList}")
-    ret
-  }
-
-}
-
-trait InnerValLike extends HBaseSerializable {
-
-  val value: Any
-
-  def compare(other: InnerValLike): Int
-
-  def +(other: InnerValLike): InnerValLike
-
-  def <(other: InnerValLike) = this.compare(other) < 0
-
-  def <=(other: InnerValLike) = this.compare(other) <= 0
-
-  def >(other: InnerValLike) = this.compare(other) > 0
-
-  def >=(other: InnerValLike) = this.compare(other) >= 0
-
-  override def toString(): String = value.toString
-
-  override def hashCode(): Int = value.hashCode()
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case other: InnerValLike =>
-        val ret = toString == obj.toString
-//        logger.debug(s"InnerValLike.equals($this, $obj) => $ret")
-        ret
-      case _ => false
-    }
-  }
-  def hashKey(dataType: String): Int
-
-  def toIdString(): String
-
-}
-
-object InnerValLikeWithTs extends HBaseDeserializable {
-  import HBaseType._
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION): (InnerValLikeWithTs, Int) 
= {
-    val (innerVal, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, 
version)
-    val ts = Bytes.toLong(bytes, offset + numOfBytesUsed)
-    (InnerValLikeWithTs(innerVal, ts), numOfBytesUsed + 8)
-  }
-
-  def withLong(l: Long, ts: Long, version: String): InnerValLikeWithTs = {
-    InnerValLikeWithTs(InnerVal.withLong(l, version), ts)
-  }
-
-  def withStr(s: String, ts: Long, version: String): InnerValLikeWithTs = {
-    InnerValLikeWithTs(InnerVal.withStr(s, version), ts)
-  }
-}
-
-case class InnerValLikeWithTs(innerVal: InnerValLike, ts: Long)
-  extends HBaseSerializable {
-
-  def bytes: Array[Byte] = {
-    Bytes.add(innerVal.bytes, Bytes.toBytes(ts))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala
deleted file mode 100644
index 6207ad7..0000000
--- 
a/s2core/src/main/scala/com/kakao/s2graph/core/types/LabelWithDirection.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.kakao.s2graph.core.types
-
-import com.kakao.s2graph.core.GraphUtil
-import com.kakao.s2graph.core.utils.logger
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Created by shon on 6/6/15.
- */
-object LabelWithDirection {
-
-  import HBaseType._
-
-  def apply(compositeInt: Int): LabelWithDirection = {
-    //      logger.debug(s"CompositeInt: $compositeInt")
-
-    val dir = compositeInt & ((1 << bitsForDir) - 1)
-    val labelId = compositeInt >> bitsForDir
-    LabelWithDirection(labelId, dir)
-  }
-
-  def labelOrderSeqWithIsInverted(labelOrderSeq: Byte, isInverted: Boolean): 
Array[Byte] = {
-    assert(labelOrderSeq < (1 << 6))
-    val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
-    Array.fill(1)(byte.toByte)
-  }
-
-  def bytesToLabelIndexSeqWithIsInverted(bytes: Array[Byte], offset: Int): 
(Byte, Boolean) = {
-    val byte = bytes(offset)
-    val isInverted = if ((byte & 1) != 0) true else false
-    val labelOrderSeq = byte >> 1
-    (labelOrderSeq.toByte, isInverted)
-  }
-}
-
-case class LabelWithDirection(labelId: Int, dir: Int) extends 
HBaseSerializable {
-
-  import HBaseType._
-
-  assert(dir < (1 << bitsForDir))
-  assert(labelId < (Int.MaxValue >> bitsForDir))
-
-  lazy val labelBits = labelId << bitsForDir
-
-  lazy val compositeInt = labelBits | dir
-
-  def bytes = {
-     Bytes.toBytes(compositeInt)
-  }
-
-  lazy val dirToggled = LabelWithDirection(labelId, GraphUtil.toggleDir(dir))
-
-  def updateDir(newDir: Int) = LabelWithDirection(labelId, newDir)
-
-  def isDirected = dir == 0 || dir == 1
-
-  override def hashCode(): Int = compositeInt
-
-  override def equals(other: Any): Boolean = {
-    other match {
-      case o: LabelWithDirection => hashCode == o.hashCode()
-      case _ => false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala
deleted file mode 100644
index 13637bf..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/types/VertexId.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-package com.kakao.s2graph.core.types
-
-import com.kakao.s2graph.core.GraphUtil
-import com.kakao.s2graph.core.types.HBaseType._
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Created by shon on 6/10/15.
- */
-object VertexId extends HBaseDeserializable {
-  import HBaseType._
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION): (VertexId, Int) = {
-    /** since murmur hash is prepended, skip numOfBytes for murmur hash */
-    var pos = offset + GraphUtil.bytesForMurMurHash
-
-    val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, 
version, isVertexId = true)
-    pos += numOfBytesUsed
-    val colId = Bytes.toInt(bytes, pos, 4)
-    (VertexId(colId, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed + 
4)
-  }
-
-  def apply(colId: Int, innerId: InnerValLike): VertexId = new VertexId(colId, 
innerId)
-
-  def toSourceVertexId(vid: VertexId) = {
-    SourceVertexId(vid.colId, vid.innerId)
-  }
-
-  def toTargetVertexId(vid: VertexId) = {
-    TargetVertexId(vid.colId, vid.innerId)
-  }
-}
-
-class VertexId protected (val colId: Int, val innerId: InnerValLike) extends 
HBaseSerializable {
-  val storeHash: Boolean = true
-  val storeColId: Boolean = true
-  lazy val hashBytes =
-//    if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toString))
-    if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toIdString()))
-    else Array.empty[Byte]
-
-  lazy val colIdBytes: Array[Byte] =
-    if (storeColId) Bytes.toBytes(colId)
-    else Array.empty[Byte]
-
-  def bytes: Array[Byte] = Bytes.add(hashBytes, innerId.bytes, colIdBytes)
-
-  override def toString(): String = {
-    colId.toString() + "," + innerId.toString()
-//    s"VertexId($colId, $innerId)"
-  }
-
-  override def hashCode(): Int = {
-    val ret = if (storeColId) {
-      colId * 31 + innerId.hashCode()
-    } else {
-      innerId.hashCode()
-    }
-//    logger.debug(s"VertexId.hashCode: $ret")
-    ret
-  }
-  override def equals(obj: Any): Boolean = {
-    val ret = obj match {
-      case other: VertexId => colId == other.colId && innerId.toIdString() == 
other.innerId.toIdString()
-      case _ => false
-    }
-//    logger.debug(s"VertexId.equals: $this, $obj => $ret")
-    ret
-  }
-
-  def compareTo(other: VertexId): Int = {
-    Bytes.compareTo(bytes, other.bytes)
-  }
-  def <(other: VertexId): Boolean = compareTo(other) < 0
-  def <=(other: VertexId): Boolean = compareTo(other) <= 0
-  def >(other: VertexId): Boolean = compareTo(other) > 0
-  def >=(other: VertexId): Boolean = compareTo(other) >= 0
-}
-
-object SourceVertexId extends HBaseDeserializable {
-  import HBaseType._
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION): (VertexId, Int) = {
-    /** since murmur hash is prepended, skip numOfBytes for murmur hash */
-    val pos = offset + GraphUtil.bytesForMurMurHash
-    val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, 
version, isVertexId = true)
-
-    (SourceVertexId(DEFAULT_COL_ID, innerId), GraphUtil.bytesForMurMurHash + 
numOfBytesUsed)
-  }
-
-}
-
-
-case class SourceVertexId(override val colId: Int,
-                          override val innerId: InnerValLike) extends 
VertexId(colId, innerId) {
-  override val storeColId: Boolean = false
-}
-
-object TargetVertexId extends HBaseDeserializable {
-  import HBaseType._
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION): (VertexId, Int) = {
-    /** murmur has is not prepended so start from offset */
-    val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, 
version, isVertexId = true)
-    (TargetVertexId(DEFAULT_COL_ID, innerId), numOfBytesUsed)
-  }
-}
-
-case class TargetVertexId(override val colId: Int,
-                          override val innerId: InnerValLike)
-  extends  VertexId(colId, innerId) {
-  override val storeColId: Boolean = false
-  override val storeHash: Boolean = false
-
-}
-
-object SourceAndTargetVertexIdPair extends HBaseDeserializable {
-  val delimiter = ":"
-  import HBaseType._
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION): 
(SourceAndTargetVertexIdPair, Int) = {
-    val pos = offset + GraphUtil.bytesForMurMurHash
-    val (srcId, srcBytesLen) = InnerVal.fromBytes(bytes, pos, len, version, 
isVertexId = true)
-    val (tgtId, tgtBytesLen) = InnerVal.fromBytes(bytes, pos + srcBytesLen, 
len, version, isVertexId = true)
-    (SourceAndTargetVertexIdPair(srcId, tgtId), GraphUtil.bytesForMurMurHash + 
srcBytesLen + tgtBytesLen)
-  }
-}
-
-case class SourceAndTargetVertexIdPair(val srcInnerId: InnerValLike, val 
tgtInnerId: InnerValLike) extends HBaseSerializable {
-  val colId = DEFAULT_COL_ID
-  import SourceAndTargetVertexIdPair._
-  override def bytes: Array[Byte] = {
-    val hashBytes = Bytes.toBytes(GraphUtil.murmur3(srcInnerId + delimiter + 
tgtInnerId))
-    Bytes.add(hashBytes, srcInnerId.bytes, tgtInnerId.bytes)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala
deleted file mode 100644
index 5111cbc..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/types/v1/InnerVal.scala
+++ /dev/null
@@ -1,226 +0,0 @@
-package com.kakao.s2graph.core.types.v1
-
-import com.kakao.s2graph.core.GraphExceptions.IllegalDataTypeException
-import com.kakao.s2graph.core.types._
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Created by shon on 6/6/15.
- */
-object InnerVal extends HBaseDeserializableWithIsVertexId {
-  import HBaseType._
-  //  val defaultVal = new InnerVal(None, None, None)
-  val stringLenOffset = 7.toByte
-  val maxStringLen = Byte.MaxValue - stringLenOffset
-  val maxMetaByte = Byte.MaxValue
-  val minMetaByte = 0.toByte
-  /**
-   * first byte encoding rule.
-   * 0 => default
-   * 1 => long
-   * 2 => int
-   * 3 => short
-   * 4 => byte
-   * 5 => true
-   * 6 => false
-   * 7 ~ 127 => string len + 7
-   */
-  val metaByte = Map("default" -> 0, "long" -> 1, "int" -> 2, "short" -> 3,
-    "byte" -> 4, "true" -> 5, "false" -> 6).map {
-    case (k, v) => (k, v.toByte)
-  }
-  val metaByteRev = metaByte.map { case (k, v) => (v.toByte, k) } ++ 
metaByte.map { case (k, v) => ((-v).toByte, k) }
-
-  def maxIdVal(dataType: String) = {
-    dataType match {
-      case "string" => InnerVal.withStr((0 until (Byte.MaxValue - 
stringLenOffset)).map("~").mkString)
-      case "long" => InnerVal.withLong(Long.MaxValue)
-      case "bool" => InnerVal.withBoolean(true)
-      case _ => throw IllegalDataTypeException(dataType)
-    }
-  }
-
-  def minIdVal(dataType: String) = {
-    dataType match {
-      case "string" => InnerVal.withStr("")
-      case "long" => InnerVal.withLong(1)
-      case "bool" => InnerVal.withBoolean(false)
-      case _ => throw IllegalDataTypeException(dataType)
-    }
-  }
-
-  def fromBytes(bytes: Array[Byte], offset: Int, len: Int, version: String = 
DEFAULT_VERSION, isVertexId: Boolean = false): (InnerVal, Int) = {
-    var pos = offset
-    //
-    val header = bytes(pos)
-    //      logger.debug(s"${bytes(offset)}: ${bytes.toList.slice(pos, 
bytes.length)}")
-    pos += 1
-
-    var numOfBytesUsed = 0
-    val (longV, strV, boolV) = metaByteRev.get(header) match {
-      case Some(s) =>
-        s match {
-          case "default" =>
-            (None, None, None)
-          case "true" =>
-            numOfBytesUsed = 0
-            (None, None, Some(true))
-          case "false" =>
-            numOfBytesUsed = 0
-            (None, None, Some(false))
-          case "byte" =>
-            numOfBytesUsed = 1
-            val b = bytes(pos)
-            val value = if (b >= 0) Byte.MaxValue - b else Byte.MinValue - b - 
1
-            (Some(value.toLong), None, None)
-          case "short" =>
-            numOfBytesUsed = 2
-            val b = Bytes.toShort(bytes, pos, 2)
-            val value = if (b >= 0) Short.MaxValue - b else Short.MinValue - b 
- 1
-            (Some(value.toLong), None, None)
-          case "int" =>
-            numOfBytesUsed = 4
-            val b = Bytes.toInt(bytes, pos, 4)
-            val value = if (b >= 0) Int.MaxValue - b else Int.MinValue - b - 1
-            (Some(value.toLong), None, None)
-          case "long" =>
-            numOfBytesUsed = 8
-            val b = Bytes.toLong(bytes, pos, 8)
-            val value = if (b >= 0) Long.MaxValue - b else Long.MinValue - b - 
1
-            (Some(value.toLong), None, None)
-        }
-      case _ => // string
-        val strLen = header - stringLenOffset
-        numOfBytesUsed = strLen
-        (None, Some(Bytes.toString(bytes, pos, strLen)), None)
-    }
-
-    (InnerVal(longV, strV, boolV), numOfBytesUsed + 1)
-  }
-
-  def withLong(l: Long): InnerVal = {
-    //      if (l < 0) throw new IllegalDataRangeException("value shoudl be >= 
0")
-    InnerVal(Some(l), None, None)
-  }
-
-  def withStr(s: String): InnerVal = {
-    InnerVal(None, Some(s), None)
-  }
-
-  def withBoolean(b: Boolean): InnerVal = {
-    InnerVal(None, None, Some(b))
-  }
-
-  /**
-   * In natural order
-   * -129, -128 , -2, -1 < 0 < 1, 2, 127, 128
-   *
-   * In byte order
-   * 0 < 1, 2, 127, 128 < -129, -128, -2, -1
-   *
-   */
-  def transform(l: Long): (Byte, Array[Byte]) = {
-    if (Byte.MinValue <= l && l <= Byte.MaxValue) {
-      //        val value = if (l < 0) l - Byte.MinValue else l + Byte.MinValue
-      val key = if (l >= 0) metaByte("byte") else -metaByte("byte")
-      val value = if (l >= 0) Byte.MaxValue - l else Byte.MinValue - l - 1
-      val valueBytes = Array.fill(1)(value.toByte)
-      (key.toByte, valueBytes)
-    } else if (Short.MinValue <= l && l <= Short.MaxValue) {
-      val key = if (l >= 0) metaByte("short") else -metaByte("short")
-      val value = if (l >= 0) Short.MaxValue - l else Short.MinValue - l - 1
-      val valueBytes = Bytes.toBytes(value.toShort)
-      (key.toByte, valueBytes)
-    } else if (Int.MinValue <= l && l <= Int.MaxValue) {
-      val key = if (l >= 0) metaByte("int") else -metaByte("int")
-      val value = if (l >= 0) Int.MaxValue - l else Int.MinValue - l - 1
-      val valueBytes = Bytes.toBytes(value.toInt)
-      (key.toByte, valueBytes)
-    } else if (Long.MinValue <= l && l <= Long.MaxValue) {
-      val key = if (l >= 0) metaByte("long") else -metaByte("long")
-      val value = if (l >= 0) Long.MaxValue - l else Long.MinValue - l - 1
-      val valueBytes = Bytes.toBytes(value.toLong)
-      (key.toByte, valueBytes)
-    } else {
-      throw new Exception(s"InnerVal range is out: $l")
-    }
-  }
-}
-
-case class InnerVal(longV: Option[Long], strV: Option[String], boolV: 
Option[Boolean])
-  extends HBaseSerializable with InnerValLike {
-
-  import InnerVal._
-
-  lazy val isDefault = longV.isEmpty && strV.isEmpty && boolV.isEmpty
-  val value = (longV, strV, boolV) match {
-    case (Some(l), None, None) => l
-    case (None, Some(s), None) => s
-    case (None, None, Some(b)) => b
-    case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
-  }
-  def valueType = (longV, strV, boolV) match {
-    case (Some(l), None, None) => "long"
-    case (None, Some(s), None) => "string"
-    case (None, None, Some(b)) => "boolean"
-    case _ => throw new Exception(s"InnerVal should be 
[long/integeer/short/byte/string/boolean]")
-  }
-
-  def compare(other: InnerValLike): Int = {
-    if (!other.isInstanceOf[InnerVal]) {
-      throw new RuntimeException(s"compare between $this vs $other is not 
supported")
-    } else {
-//      (value, other.value) match {
-//        case (v1: Long, v2: Long) => v1.compare(v2)
-//        case (b1: Boolean, b2: Boolean) => b1.compare(b2)
-//        case (s1: String, s2: String) => s1.compare(s2)
-//        case _ => throw new Exception("Please check a type of the compare 
operands")
-//      }
-      Bytes.compareTo(bytes, other.bytes) * -1
-    }
-  }
-
-  def +(other: InnerValLike) = {
-    if (!other.isInstanceOf[InnerVal]) {
-      throw new RuntimeException(s"+ between $this vs $other is not supported")
-    } else {
-      (value, other.value) match {
-        case (v1: Long, v2: Long) => InnerVal.withLong(v1 + v2)
-        case (b1: Boolean, b2: Boolean) => InnerVal.withBoolean(if (b2) !b1 
else b1)
-        case _ => throw new Exception("Please check a type of the incr 
operands")
-      }
-    }
-  }
-
-  def bytes = {
-    val (meta, valBytes) = (longV, strV, boolV) match {
-      case (None, None, None) =>
-        (metaByte("default"), Array.empty[Byte])
-      case (Some(l), None, None) =>
-        transform(l)
-      case (None, None, Some(b)) =>
-        val meta = if (b) metaByte("true") else metaByte("false")
-        (meta, Array.empty[Byte])
-      case (None, Some(s), None) =>
-        val sBytes = Bytes.toBytes(s)
-        if (sBytes.length > maxStringLen) {
-          throw new IllegalDataTypeException(s"string in innerVal maxSize is 
$maxStringLen, given ${sBytes.length}")
-        }
-        assert(sBytes.length <= maxStringLen)
-        val meta = (stringLenOffset + sBytes.length).toByte
-        (meta, sBytes)
-      case _ => throw new IllegalDataTypeException("innerVal data type should 
be [long/string/bool]")
-    }
-    Bytes.add(Array.fill(1)(meta.toByte), valBytes)
-  }
-
-  override def toString(): String = {
-    value.toString
-  }
-  override def hashKey(dataType: String): Int = {
-    value.toString.hashCode()
-  }
-  override def toIdString(): String = {
-    value.toString
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala
deleted file mode 100644
index 630e53d..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/types/v2/InnerVal.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-package com.kakao.s2graph.core.types.v2
-
-import com.kakao.s2graph.core.types._
-import org.apache.hadoop.hbase.util._
-
-/**
- * Created by shon on 6/6/15.
- */
-object InnerVal extends HBaseDeserializableWithIsVertexId {
-
-  import HBaseType._
-
-  val order = Order.DESCENDING
-
-  def fromBytes(bytes: Array[Byte],
-                offset: Int,
-                len: Int,
-                version: String = DEFAULT_VERSION,
-                isVertexId: Boolean = false): (InnerVal, Int) = {
-    val pbr = new SimplePositionedByteRange(bytes)
-    pbr.setPosition(offset)
-    val startPos = pbr.getPosition
-    if (bytes(offset) == -1 | bytes(offset) == 0) {
-      /** simple boolean */
-      val boolean = order match {
-        case Order.DESCENDING => bytes(offset) == 0
-        case _ => bytes(offset) == -1
-      }
-      (InnerVal(boolean), 1)
-    }
-    else {
-      if (OrderedBytes.isNumeric(pbr)) {
-        val numeric = OrderedBytes.decodeNumericAsBigDecimal(pbr)
-        if (isVertexId) (InnerVal(numeric.longValue()), pbr.getPosition - 
startPos)
-        else (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos)
-//        (InnerVal(numeric.doubleValue()), pbr.getPosition - startPos)
-//        (InnerVal(BigDecimal(numeric)), pbr.getPosition - startPos)
-      } else if (OrderedBytes.isText(pbr)) {
-        val str = OrderedBytes.decodeString(pbr)
-        (InnerVal(str), pbr.getPosition - startPos)
-      } else if (OrderedBytes.isBlobVar(pbr)) {
-        val blobVar = OrderedBytes.decodeBlobVar(pbr)
-        (InnerVal(blobVar), pbr.getPosition - startPos)
-      } else {
-        throw new RuntimeException("!!")
-      }
-    }
-  }
-}
-
-case class InnerVal(value: Any) extends HBaseSerializable with InnerValLike {
-
-  import com.kakao.s2graph.core.types.InnerVal._
-
-  def bytes: Array[Byte] = {
-    val ret = value match {
-      case b: Boolean =>
-
-        /** since OrderedBytes header start from 0x05, it is safe to use -1, 0
-          * for decreasing order (true, false) */
-        //        Bytes.toBytes(b)
-        order match {
-          case Order.DESCENDING => if (b) Array(0.toByte) else Array(-1.toByte)
-          case _ => if (!b) Array(0.toByte) else Array(-1.toByte)
-        }
-      case d: Double =>
-        val num = BigDecimal(d)
-        val pbr = numByteRange(num)
-        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
-        pbr.getBytes().take(len)
-      case l: Long =>
-        val num = BigDecimal(l)
-        val pbr = numByteRange(num)
-        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
-        pbr.getBytes().take(len)
-      case i: Int =>
-        val num = BigDecimal(i)
-        val pbr = numByteRange(num)
-        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
-        pbr.getBytes().take(len)
-      case sh: Short =>
-        val num = BigDecimal(sh)
-        val pbr = numByteRange(num)
-        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
-        pbr.getBytes().take(len)
-      case b: Byte =>
-        val num = BigDecimal(b)
-        val pbr = numByteRange(num)
-        val len = OrderedBytes.encodeNumeric(pbr, num.bigDecimal, order)
-        pbr.getBytes().take(len)
-
-
-      case b: BigDecimal =>
-        val pbr = numByteRange(b)
-        val len = OrderedBytes.encodeNumeric(pbr, b.bigDecimal, order)
-        pbr.getBytes().take(len)
-      case s: String =>
-        val pbr = new SimplePositionedMutableByteRange(s.getBytes.length + 3)
-        val len = OrderedBytes.encodeString(pbr, s, order)
-        pbr.getBytes().take(len)
-      case blob: Array[Byte] =>
-        val len = OrderedBytes.blobVarEncodedLength(blob.length)
-        val pbr = new SimplePositionedMutableByteRange(len)
-        val totalLen = OrderedBytes.encodeBlobVar(pbr, blob, order)
-        pbr.getBytes().take(totalLen)
-    }
-    //    println(s"$value => ${ret.toList}, ${ret.length}")
-    ret
-  }
-
-//
-  override def hashKey(dataType: String): Int = {
-    if (value.isInstanceOf[String]) {
-      // since we use dummy stringn value for degree edge.
-      value.toString.hashCode()
-    } else {
-      dataType match {
-        case BYTE => 
value.asInstanceOf[BigDecimal].bigDecimal.byteValue().hashCode()
-        case FLOAT => 
value.asInstanceOf[BigDecimal].bigDecimal.floatValue().hashCode()
-        case DOUBLE => 
value.asInstanceOf[BigDecimal].bigDecimal.doubleValue().hashCode()
-        case LONG => 
value.asInstanceOf[BigDecimal].bigDecimal.longValue().hashCode()
-        case INT => 
value.asInstanceOf[BigDecimal].bigDecimal.intValue().hashCode()
-        case SHORT => 
value.asInstanceOf[BigDecimal].bigDecimal.shortValue().hashCode()
-        case STRING => value.toString.hashCode
-        case _ => throw new RuntimeException(s"NotSupportede type: $dataType")
-      }
-    }
-  }
-
-  def compare(other: InnerValLike): Int = {
-    if (!other.isInstanceOf[InnerValLike])
-      throw new RuntimeException(s"compare $this vs $other")
-    Bytes.compareTo(bytes, other.bytes) * -1
-  }
-
-  def +(other: InnerValLike): InnerValLike = {
-    if (!other.isInstanceOf[InnerValLike])
-      throw new RuntimeException(s"+ $this, $other")
-
-    (value, other.value) match {
-      case (v1: BigDecimal, v2: BigDecimal) => new 
InnerVal(BigDecimal(v1.bigDecimal.add(v2.bigDecimal)))
-      case _ => throw new RuntimeException("+ operation on inner val is for 
big decimal pair")
-    }
-  }
-
-  //need to be removed ??
-  override def toString(): String = {
-//    value.toString()
-    value match {
-      case n: BigDecimal => n.bigDecimal.toPlainString
-      case _ => value.toString
-    }
-  }
-
-  override def toIdString(): String = {
-    value match {
-      case n: BigDecimal => n.bigDecimal.longValue().toString()
-      case _ => value.toString
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala
deleted file mode 100644
index 6777c28..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/DeferCache.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.kakao.s2graph.core.utils
-
-import java.util.concurrent.TimeUnit
-
-import com.google.common.cache.CacheBuilder
-import com.stumbleupon.async.Deferred
-import com.typesafe.config.Config
-
-import scala.concurrent.ExecutionContext
-
-class DeferCache[R](config: Config)(implicit ex: ExecutionContext) {
-
-  import com.kakao.s2graph.core.utils.Extensions.DeferOps
-
-  type Value = (Long, Deferred[R])
-
-  private val maxSize = config.getInt("future.cache.max.size")
-  private val expireAfterWrite = 
config.getInt("future.cache.expire.after.write")
-  private val expireAfterAccess = 
config.getInt("future.cache.expire.after.access")
-
-  private val futureCache = CacheBuilder.newBuilder()
-  .initialCapacity(maxSize)
-  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
-  .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
-  .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
-  .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]()
-
-
-  def asMap() = futureCache.asMap()
-
-  def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey)
-
-  private def checkAndExpire(cacheKey: Long,
-                             cachedAt: Long,
-                             cacheTTL: Long,
-                             oldDefer: Deferred[R])(op: => Deferred[R]): 
Deferred[R] = {
-    if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
-      // future is too old. so need to expire and fetch new data from storage.
-      futureCache.asMap().remove(cacheKey)
-
-      val newPromise = new Deferred[R]()
-      val now = System.currentTimeMillis()
-
-      futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
-        case null =>
-          // only one thread succeed to come here concurrently
-          // initiate fetch to storage then add callback on complete to finish 
promise.
-          op withCallback { value =>
-            newPromise.callback(value)
-            value
-          }
-          newPromise
-        case (cachedAt, oldDefer) => oldDefer
-      }
-    } else {
-      // future is not to old so reuse it.
-      oldDefer
-    }
-  }
-  def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): 
Deferred[R] = {
-    val cacheVal = futureCache.getIfPresent(cacheKey)
-    cacheVal match {
-      case null =>
-        val promise = new Deferred[R]()
-        val now = System.currentTimeMillis()
-        val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, 
(now, promise)) match {
-          case null =>
-            op.withCallback { value =>
-              promise.callback(value)
-              value
-            }
-            (now, promise)
-          case oldVal => oldVal
-        }
-        checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
-
-      case (cachedAt, defer) =>
-        checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
deleted file mode 100644
index eea9a79..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Extentions.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.kakao.s2graph.core.utils
-
-import com.stumbleupon.async.{Callback, Deferred}
-import com.typesafe.config.Config
-import scala.concurrent.{ExecutionContext, Future, Promise}
-
-object Extensions {
-
-
-  def retryOnSuccess[T](maxRetryNum: Int, n: Int = 1)(fn: => 
Future[T])(shouldStop: T => Boolean)(implicit ex: ExecutionContext): Future[T] 
= n match {
-    case i if n <= maxRetryNum =>
-      fn.flatMap { result =>
-        if (!shouldStop(result)) {
-          logger.info(s"retryOnSuccess $n")
-          retryOnSuccess(maxRetryNum, n + 1)(fn)(shouldStop)
-        } else {
-          Future.successful(result)
-        }
-      }
-    case _ => fn
-  }
-
-  def retryOnFailure[T](maxRetryNum: Int, n: Int = 1)(fn: => 
Future[T])(fallback: => T)(implicit ex: ExecutionContext): Future[T] = n match {
-    case i if n <= maxRetryNum =>
-      fn recoverWith { case t: Throwable =>
-        logger.info(s"retryOnFailure $n $t")
-        retryOnFailure(maxRetryNum, n + 1)(fn)(fallback)
-      }
-    case _ =>
-      Future.successful(fallback)
-  }
-
-
-  implicit class DeferOps[T](d: Deferred[T])(implicit ex: ExecutionContext) {
-
-    def withCallback[R](op: T => R): Deferred[R] = {
-      d.addCallback(new Callback[R, T] {
-        override def call(arg: T): R = op(arg)
-      })
-    }
-
-    def recoverWith(op: Exception => T): Deferred[T] = {
-      d.addErrback(new Callback[Deferred[T], Exception] {
-        override def call(e: Exception): Deferred[T] = 
Deferred.fromResult(op(e))
-      })
-    }
-
-
-    def toFuture: Future[T] = {
-      val promise = Promise[T]
-
-      d.addBoth(new Callback[Unit, T] {
-        def call(arg: T) = arg match {
-          case e: Exception => promise.failure(e)
-          case _ => promise.success(arg)
-        }
-      })
-
-      promise.future
-    }
-
-    def toFutureWith(fallback: => T): Future[T] = {
-      toFuture recoverWith { case t: Throwable => Future.successful(fallback) }
-    }
-
-  }
-
-  implicit class ConfigOps(config: Config) {
-    def getBooleanWithFallback(key: String, defaultValue: Boolean): Boolean =
-      if (config.hasPath(key)) config.getBoolean(key) else defaultValue
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala
deleted file mode 100644
index 17d9e8f..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/FutureCache.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.kakao.s2graph.core.utils
-
-import java.util.concurrent.TimeUnit
-
-import com.google.common.cache.CacheBuilder
-import com.typesafe.config.Config
-
-import scala.concurrent.{Promise, Future, ExecutionContext}
-
-
-class FutureCache[R](config: Config)(implicit ex: ExecutionContext) {
-
-  type Value = (Long, Future[R])
-
-  private val maxSize = config.getInt("future.cache.max.size")
-  private val expireAfterWrite = 
config.getInt("future.cache.expire.after.write")
-  private val expireAfterAccess = 
config.getInt("future.cache.expire.after.access")
-
-  private val futureCache = CacheBuilder.newBuilder()
-  .initialCapacity(maxSize)
-  .concurrencyLevel(Runtime.getRuntime.availableProcessors())
-  .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
-  .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
-  .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]()
-
-
-  def asMap() = futureCache.asMap()
-
-  def getIfPresent(cacheKey: Long): Value = {
-    val (cachedAt, promise) = futureCache.getIfPresent(cacheKey)
-    (cachedAt, promise.future)
-  }
-
-  private def checkAndExpire(cacheKey: Long,
-                             cachedAt: Long,
-                             cacheTTL: Long,
-                             oldFuture: Future[R])(op: => Future[R]): 
Future[R] = {
-    if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
-      // future is too old. so need to expire and fetch new data from storage.
-      futureCache.asMap().remove(cacheKey)
-
-      val newPromise = Promise[R]
-      val now = System.currentTimeMillis()
-
-      futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
-        case null =>
-          // only one thread succeed to come here concurrently
-          // initiate fetch to storage then add callback on complete to finish 
promise.
-          op.onSuccess { case value =>
-            newPromise.success(value)
-            value
-          }
-          newPromise.future
-        case (cachedAt, oldPromise) => oldPromise.future
-      }
-    } else {
-      // future is not to old so reuse it.
-      oldFuture
-    }
-  }
-  def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): 
Future[R] = {
-    val cacheVal = futureCache.getIfPresent(cacheKey)
-    cacheVal match {
-      case null =>
-        val promise = Promise[R]
-        val now = System.currentTimeMillis()
-        val (cachedAt, cachedPromise) = 
futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match {
-          case null =>
-            op.onSuccess { case value =>
-              promise.success(value)
-              value
-            }
-            (now, promise)
-          case oldVal => oldVal
-        }
-        checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
-
-      case (cachedAt, cachedPromise) =>
-        checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala
deleted file mode 100644
index d281017..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/Logger.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-package com.kakao.s2graph.core.utils
-
-import play.api.libs.json.JsValue
-import org.slf4j.LoggerFactory
-
-import scala.language.{higherKinds, implicitConversions}
-
-object logger {
-
-  trait Loggable[T] {
-    def toLogMessage(msg: T): String
-  }
-
-  object Loggable {
-    implicit val stringLoggable = new Loggable[String] {
-      def toLogMessage(msg: String) = msg
-    }
-
-    implicit def numericLoggable[T: Numeric] = new Loggable[T] {
-      def toLogMessage(msg: T) = msg.toString
-    }
-
-    implicit val jsonLoggable = new Loggable[JsValue] {
-      def toLogMessage(msg: JsValue) = msg.toString()
-    }
-
-    implicit val booleanLoggable = new Loggable[Boolean] {
-      def toLogMessage(msg: Boolean) = msg.toString()
-    }
-  }
-
-  private val logger = LoggerFactory.getLogger("application")
-  private val errorLogger = LoggerFactory.getLogger("error")
-
-  def info[T: Loggable](msg: => T) = 
logger.info(implicitly[Loggable[T]].toLogMessage(msg))
-
-  def debug[T: Loggable](msg: => T) = 
logger.debug(implicitly[Loggable[T]].toLogMessage(msg))
-
-  def error[T: Loggable](msg: => T, exception: => Throwable) = 
errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
-
-  def error[T: Loggable](msg: => T) = 
errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg))
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala
deleted file mode 100644
index d383b47..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/utils/SafeUpdateCache.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.kakao.s2graph.core.utils
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import com.google.common.cache.CacheBuilder
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-object SafeUpdateCache {
-
-  case class CacheKey(key: String)
-
-}
-
-class SafeUpdateCache[T](prefix: String, maxSize: Int, ttl: Int)(implicit 
executionContext: ExecutionContext) {
-
-  import SafeUpdateCache._
-
-  implicit class StringOps(key: String) {
-    def toCacheKey = new CacheKey(prefix + ":" + key)
-  }
-
-  def toTs() = (System.currentTimeMillis() / 1000).toInt
-
-  private val cache = 
CacheBuilder.newBuilder().maximumSize(maxSize).build[CacheKey, (T, Int, 
AtomicBoolean)]()
-
-  def put(key: String, value: T) = cache.put(key.toCacheKey, (value, toTs, new 
AtomicBoolean(false)))
-
-  def invalidate(key: String) = cache.invalidate(key.toCacheKey)
-
-  def withCache(key: String)(op: => T): T = {
-    val cacheKey = key.toCacheKey
-    val cachedValWithTs = cache.getIfPresent(cacheKey)
-
-    if (cachedValWithTs == null) {
-      // fetch and update cache.
-      val newValue = op
-      cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false)))
-      newValue
-    } else {
-      val (cachedVal, updatedAt, isUpdating) = cachedValWithTs
-      if (toTs() < updatedAt + ttl) cachedVal // in cache TTL
-      else {
-        val running = isUpdating.getAndSet(true)
-        if (running) cachedVal
-        else {
-          Future(op)(executionContext) onComplete {
-            case Failure(ex) =>
-              cache.put(cacheKey, (cachedVal, toTs(), new 
AtomicBoolean(false))) // keep old value
-              logger.error(s"withCache update failed: $cacheKey")
-            case Success(newValue) =>
-              cache.put(cacheKey, (newValue, toTs(), new 
AtomicBoolean(false))) // update new value
-              logger.info(s"withCache update success: $cacheKey")
-          }
-          cachedVal
-        }
-      }
-    }
-  }
-}
-

Reply via email to