http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala
deleted file mode 100644
index 3cd7995..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-/**
- * Created by shon on 6/3/15.
- */
-
-
-import com.kakao.s2graph.core.JSONParser
-import com.kakao.s2graph.core.GraphExceptions.MaxPropSizeReachedException
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object LabelMeta extends Model[LabelMeta] with JSONParser {
-
-  /** dummy sequences */
-
-  val fromSeq = -4.toByte
-  val toSeq = -5.toByte
-  val lastOpSeq = -3.toByte
-  val lastDeletedAt = -2.toByte
-  val timeStampSeq = 0.toByte
-  val countSeq = (Byte.MaxValue - 2).toByte
-  val degreeSeq = (Byte.MaxValue - 1).toByte
-  val maxValue = Byte.MaxValue
-  val emptyValue = Byte.MaxValue
-
-  /** reserved sequences */
-  //  val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = 
lastDeletedAt, name = "lastDeletedAt",
-  //    seq = lastDeletedAt, defaultValue = "", dataType = "long")
-  val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from",
-    seq = fromSeq, defaultValue = fromSeq.toString, dataType = "long")
-  val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to",
-    seq = toSeq, defaultValue = toSeq.toString, dataType = "long")
-  val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp",
-    seq = timeStampSeq, defaultValue = "0", dataType = "long")
-  val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree",
-    seq = degreeSeq, defaultValue = "0", dataType = "long")
-  val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count",
-    seq = countSeq, defaultValue = "-1", dataType = "long")
-
-  // Each reserved column(_timestamp, timestamp) has same seq number, starts 
with '_' has high priority
-  val reservedMetas = List(from, to, degree, timestamp, count).flatMap { lm => 
List(lm, lm.copy(name = lm.name.drop(1))) }.reverse
-  val reservedMetasInner = List(from, to, degree, timestamp, count)
-
-  def apply(rs: WrappedResultSet): LabelMeta = {
-    LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), 
rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase)
-  }
-
-  def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq
-  def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta 
= {
-    val cacheKey = "id=" + id
-
-    withCache(cacheKey) {
-      sql"""select * from label_metas where id = ${id}""".map { rs => 
LabelMeta(rs) }.single.apply
-    }.get
-  }
-
-  def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): List[LabelMeta] = {
-    val cacheKey = "labelId=" + labelId
-    lazy val labelMetas = sql"""select *
-                                                               from label_metas
-                                                               where label_id 
= ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply()
-
-    if (useCache) withCaches(cacheKey)(labelMetas)
-    else labelMetas
-  }
-
-  def findByName(labelId: Int, name: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = {
-    name match {
-      case timestamp.name => Some(timestamp)
-      case from.name => Some(from)
-      case to.name => Some(to)
-      case _ =>
-        val cacheKey = "labelId=" + labelId + ":name=" + name
-        lazy val labelMeta = sql"""
-            select *
-            from label_metas where label_id = ${labelId} and name = ${name}"""
-          .map { rs => LabelMeta(rs) }.single.apply()
-
-        if (useCache) withCache(cacheKey)(labelMeta)
-        else labelMeta
-    }
-  }
-
-  def insert(labelId: Int, name: String, defaultValue: String, dataType: 
String)(implicit session: DBSession = AutoSession) = {
-    val ls = findAllByLabelId(labelId, useCache = false)
-    val seq = ls.size + 1
-
-    if (seq < maxValue) {
-      sql"""insert into label_metas(label_id, name, seq, default_value, 
data_type)
-    select ${labelId}, ${name}, ${seq}, ${defaultValue}, 
${dataType}""".updateAndReturnGeneratedKey.apply()
-    } else {
-      throw MaxPropSizeReachedException("max property size reached")
-    }
-  }
-
-  def findOrInsert(labelId: Int,
-                   name: String,
-                   defaultValue: String,
-                   dataType: String)(implicit session: DBSession = 
AutoSession): LabelMeta = {
-
-    findByName(labelId, name) match {
-      case Some(c) => c
-      case None =>
-        insert(labelId, name, defaultValue, dataType)
-        val cacheKey = "labelId=" + labelId + ":name=" + name
-        val cacheKeys = "labelId=" + labelId
-        expireCache(cacheKey)
-        expireCaches(cacheKeys)
-        findByName(labelId, name, useCache = false).get
-    }
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val labelMeta = findById(id)
-    val (labelId, name) = (labelMeta.labelId, labelMeta.name)
-    sql"""delete from label_metas where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"labelId=$labelId", 
s"labelId=$labelId:name=$name")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) 
}.list.apply
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      cacheKey -> x
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"labelId=${x.labelId}:name=${x.name}"
-      cacheKey -> x
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}"
-      cacheKey -> x
-    })
-
-    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
-      val cacheKey = s"labelId=${labelId}"
-      cacheKey -> ls
-    }.toList)
-  }
-}
-
-case class LabelMeta(id: Option[Int], labelId: Int, name: String, seq: Byte, 
defaultValue: String, dataType: String) extends JSONParser {
-  lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, 
"dataType" -> dataType)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala
deleted file mode 100644
index d41e9fa..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-import java.util.concurrent.Executors
-
-import com.kakao.s2graph.core.utils.{SafeUpdateCache, logger}
-import com.typesafe.config.Config
-import scalikejdbc._
-
-import scala.concurrent.ExecutionContext
-import scala.language.{higherKinds, implicitConversions}
-import scala.util.{Failure, Try}
-
-object Model {
-  var maxSize = 10000
-  var ttl = 60
-  val numOfThread = Runtime.getRuntime.availableProcessors()
-  val threadPool = Executors.newFixedThreadPool(numOfThread)
-  val ec = ExecutionContext.fromExecutor(threadPool)
-
-  def apply(config: Config) = {
-    maxSize = config.getInt("cache.max.size")
-    ttl = config.getInt("cache.ttl.seconds")
-    Class.forName(config.getString("db.default.driver"))
-
-    val settings = ConnectionPoolSettings(
-      initialSize = 10,
-      maxSize = 10,
-      connectionTimeoutMillis = 30000L,
-      validationQuery = "select 1;")
-
-    ConnectionPool.singleton(
-      config.getString("db.default.url"),
-      config.getString("db.default.user"),
-      config.getString("db.default.password"),
-      settings)
-  }
-
-  def withTx[T](block: DBSession => T): Try[T] = {
-    using(DB(ConnectionPool.borrow())) { conn =>
-      val res = Try {
-        conn.begin()
-        val session = conn.withinTxSession()
-        val result = block(session)
-
-        conn.commit()
-
-        result
-      } recoverWith {
-        case e: Exception =>
-          conn.rollbackIfActive()
-          Failure(e)
-      }
-
-      res
-    }
-  }
-
-  def shutdown() = {
-    ConnectionPool.closeAll()
-  }
-
-  def loadCache() = {
-    Service.findAll()
-    ServiceColumn.findAll()
-    Label.findAll()
-    LabelMeta.findAll()
-    LabelIndex.findAll()
-    ColumnMeta.findAll()
-  }
-}
-
-trait Model[V] extends SQLSyntaxSupport[V] {
-
-  import Model._
-
-  implicit val ec: ExecutionContext = Model.ec
-
-  val cName = this.getClass.getSimpleName()
-  logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]")
-
-  val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl)
-  val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl)
-
-  val withCache = optionCache.withCache _
-
-  val withCaches = listCache.withCache _
-
-  val expireCache = optionCache.invalidate _
-
-  val expireCaches = listCache.invalidate _
-
-  def putsToCache(kvs: List[(String, V)]) = kvs.foreach {
-    case (key, value) => optionCache.put(key, Option(value))
-  }
-
-  def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach {
-    case (key, values) => listCache.put(key, values)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
deleted file mode 100644
index 2840db8..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-
-import java.util.UUID
-
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{Management}
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object Service extends Model[Service] {
-  def apply(rs: WrappedResultSet): Service = {
-    Service(rs.intOpt("id"), rs.string("service_name"), 
rs.string("access_token"),
-      rs.string("cluster"), rs.string("hbase_table_name"), 
rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl"))
-  }
-
-  def findByAccessToken(accessToken: String)(implicit session: DBSession = 
AutoSession): Option[Service] = {
-    val cacheKey = s"accessToken=$accessToken"
-    withCache(cacheKey)( sql"""select * from services where access_token = 
${accessToken}""".map { rs => Service(rs) }.single.apply)
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): Service = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)( sql"""select * from services where id = ${id}""".map 
{ rs => Service(rs) }.single.apply).get
-  }
-
-  def findByName(serviceName: String, useCache: Boolean = true)(implicit 
session: DBSession = AutoSession): Option[Service] = {
-    val cacheKey = "serviceName=" + serviceName
-    lazy val serviceOpt = sql"""
-        select * from services where service_name = ${serviceName}
-      """.map { rs => Service(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey)(serviceOpt)
-    else serviceOpt
-  }
-
-  def insert(serviceName: String, cluster: String,
-             hTableName: String, preSplitSize: Int, hTableTTL: Option[Int],
-             compressionAlgorithm: String)(implicit session: DBSession = 
AutoSession): Unit = {
-    logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, 
$hTableTTL, $compressionAlgorithm")
-    val accessToken = UUID.randomUUID().toString()
-    sql"""insert into services(service_name, access_token, cluster, 
hbase_table_name, pre_split_size, hbase_table_ttl)
-    values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, 
${preSplitSize}, ${hTableTTL})""".execute.apply()
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val service = findById(id)
-    val serviceName = service.serviceName
-    sql"""delete from service_columns where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"serviceName=$serviceName")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findOrInsert(serviceName: String, cluster: String, hTableName: String,
-                   preSplitSize: Int, hTableTTL: Option[Int], 
compressionAlgorithm: String)(implicit session: DBSession = AutoSession): 
Service = {
-    findByName(serviceName) match {
-      case Some(s) => s
-      case None =>
-        insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, 
compressionAlgorithm)
-        val cacheKey = "serviceName=" + serviceName
-        expireCache(cacheKey)
-        findByName(serviceName).get
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from services""".map { rs => Service(rs) 
}.list.apply
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"serviceName=${x.serviceName}"
-      (cacheKey -> x)
-    })
-  }
-
-  def findAllConn()(implicit session: DBSession = AutoSession): List[String] = 
{
-    sql"""select distinct(cluster) from services""".map { rs => 
rs.string("cluster") }.list.apply
-  }
-}
-
-case class Service(id: Option[Int], serviceName: String, accessToken: String, 
cluster: String, hTableName: String, preSplitSize: Int, hTableTTL: Option[Int]) 
{
-  lazy val toJson =
-    id match {
-      case Some(_id) =>
-        Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> 
accessToken, "cluster" -> cluster,
-          "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, 
"hTableTTL" -> hTableTTL)
-      case None =>
-        Json.parse("{}")
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala
deleted file mode 100644
index 9ca32a0..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.kakao.s2graph.core.mysqls
-
-/**
- * Created by shon on 6/3/15.
- */
-
-import com.kakao.s2graph.core.JSONParser
-import play.api.libs.json.Json
-import scalikejdbc._
-object ServiceColumn extends Model[ServiceColumn] {
-
-  def apply(rs: WrappedResultSet): ServiceColumn = {
-    ServiceColumn(rs.intOpt("id"), rs.int("service_id"), 
rs.string("column_name"), rs.string("column_type").toLowerCase(), 
rs.string("schema_version"))
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): 
ServiceColumn = {
-//    val cacheKey = s"id=$id"
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(sql"""select * from service_columns where id = 
${id}""".map { x => ServiceColumn(x) }.single.apply).get
-  }
-  def find(serviceId: Int, columnName: String, useCache: Boolean = 
true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
-//    val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
-    val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
-    if (useCache) {
-      withCache(cacheKey) {
-        sql"""
-          select * from service_columns where service_id = ${serviceId} and 
column_name = ${columnName}
-        """.map { rs => ServiceColumn(rs) }.single.apply()
-      }
-    } else {
-      sql"""
-        select * from service_columns where service_id = ${serviceId} and 
column_name = ${columnName}
-      """.map { rs => ServiceColumn(rs) }.single.apply()
-    }
-  }
-  def insert(serviceId: Int, columnName: String, columnType: Option[String], 
schemaVersion: String)(implicit session: DBSession = AutoSession) = {
-    sql"""insert into service_columns(service_id, column_name, column_type, 
schema_version)
-         values(${serviceId}, ${columnName}, ${columnType}, 
${schemaVersion})""".execute.apply()
-  }
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val serviceColumn = findById(id)
-    val (serviceId, columnName) = (serviceColumn.serviceId, 
serviceColumn.columnName)
-    sql"""delete from service_columns where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", 
s"serviceId=$serviceId:columnName=$columnName")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-  def findOrInsert(serviceId: Int, columnName: String, columnType: 
Option[String], schemaVersion: String)(implicit session: DBSession = 
AutoSession): ServiceColumn = {
-    find(serviceId, columnName) match {
-      case Some(sc) => sc
-      case None =>
-        insert(serviceId, columnName, columnType, schemaVersion)
-//        val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
-        val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
-        expireCache(cacheKey)
-        find(serviceId, columnName).get
-    }
-  }
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from service_columns""".map { rs => 
ServiceColumn(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      var cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
-      (cacheKey -> x)
-    })
-  }
-}
-case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, 
columnType: String, schemaVersion: String) extends JSONParser {
-
-  lazy val service = Service.findById(serviceId)
-  lazy val metas = ColumnMeta.findAllByColumn(id.get)
-  lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
-  lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => 
(x.seq.toInt, x.name)) toMap
-  lazy val toJson = Json.obj("serviceName" -> service.serviceName, 
"columnName" -> columnName, "columnType" -> columnType)
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala
deleted file mode 100644
index 2a62b3b..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala
+++ /dev/null
@@ -1,207 +0,0 @@
-package com.kakao.s2graph.core.parsers
-
-import com.kakao.s2graph.core.GraphExceptions.WhereParserException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.types.InnerValLike
-
-import scala.annotation.tailrec
-import scala.util.Try
-import scala.util.parsing.combinator.JavaTokenParsers
-
-trait ExtractValue extends JSONParser {
-  val parent = "_parent."
-
-  def propToInnerVal(edge: Edge, key: String) = {
-    val (propKey, parentEdge) = findParentEdge(edge, key)
-
-    val label = parentEdge.label
-    val metaPropInvMap = label.metaPropsInvMap
-    val labelMeta = metaPropInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
-    val metaSeq = labelMeta.seq
-
-    metaSeq match {
-      case LabelMeta.from.seq => parentEdge.srcVertex.innerId
-      case LabelMeta.to.seq => parentEdge.tgtVertex.innerId
-      case _ => parentEdge.propsWithTs.get(metaSeq) match {
-        case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, 
label.schemaVersion)
-        case Some(edgeVal) => edgeVal.innerVal
-      }
-    }
-  }
-
-  def valueToCompare(edge: Edge, key: String, value: String) = {
-    val label = edge.label
-    if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) 
propToInnerVal(edge, value)
-    else {
-      val (propKey, _) = findParentEdge(edge, key)
-
-      val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
-      val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.labelWithDir.dir)
-      val dataType = propKey match {
-        case "_to" | "to" => tgtColumn.columnType
-        case "_from" | "from" => srcColumn.columnType
-        case _ => labelMeta.dataType
-      }
-      toInnerVal(value, dataType, label.schemaVersion)
-    }
-  }
-
-  @tailrec
-  private def findParent(edge: Edge, depth: Int): Edge =
-    if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1)
-    else edge
-
-  private def findParentEdge(edge: Edge, key: String): (String, Edge) = {
-    if (!key.startsWith(parent)) (key, edge)
-    else {
-      val split = key.split(parent)
-      val depth = split.length - 1
-      val propKey = split.last
-
-      val parentEdge = findParent(edge, depth)
-
-      (propKey, parentEdge)
-    }
-  }
-}
-
-trait Clause extends ExtractValue {
-  def and(otherField: Clause): Clause = And(this, otherField)
-
-  def or(otherField: Clause): Clause = Or(this, otherField)
-
-  def filter(edge: Edge): Boolean
-
-  def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: 
String, value: String)(edge: Edge): Boolean = {
-    val propValue = propToInnerVal(edge, propKey)
-    val compValue = valueToCompare(edge, propKey, value)
-
-    binOp(propValue, compValue)
-  }
-}
-
-case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) {
-  def filter(edge: Edge) =
-    if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity)
-}
-
-case class Gt(propKey: String, value: String) extends Clause {
-  override def filter(edge: Edge): Boolean = binaryOp(_ > _)(propKey, 
value)(edge)
-}
-
-case class Lt(propKey: String, value: String) extends Clause {
-  override def filter(edge: Edge): Boolean = binaryOp(_ < _)(propKey, 
value)(edge)
-}
-
-case class Eq(propKey: String, value: String) extends Clause {
-  override def filter(edge: Edge): Boolean = binaryOp(_ == _)(propKey, 
value)(edge)
-}
-
-case class InWithoutParent(label: Label, propKey: String, values: Set[String]) 
extends Clause {
-  val innerValLikeLs = values.map { value =>
-    val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw 
WhereParserException(s"Where clause contains not existing property name: 
$propKey"))
-    val dataType = propKey match {
-      case "_to" | "to" => label.tgtColumn.columnType
-      case "_from" | "from" => label.srcColumn.columnType
-      case _ => labelMeta.dataType
-    }
-    toInnerVal(value, dataType, label.schemaVersion)
-  }
-  override def filter(edge: Edge): Boolean = {
-    val propVal = propToInnerVal(edge, propKey)
-    innerValLikeLs.contains(propVal)
-  }
-}
-
-case class IN(propKey: String, values: Set[String]) extends Clause {
-  override def filter(edge: Edge): Boolean = {
-    val propVal = propToInnerVal(edge, propKey)
-    values.exists { value =>
-      valueToCompare(edge, propKey, value) == propVal
-    }
-  }
-}
-
-case class Between(propKey: String, minValue: String, maxValue: String) 
extends Clause {
-  override def filter(edge: Edge): Boolean = {
-    val propVal = propToInnerVal(edge, propKey)
-    val minVal = valueToCompare(edge, propKey, minValue)
-    val maxVal = valueToCompare(edge, propKey, maxValue)
-
-    minVal <= propVal && propVal <= maxVal
-  }
-}
-
-case class Not(self: Clause) extends Clause {
-  override def filter(edge: Edge) = !self.filter(edge)
-}
-
-case class And(left: Clause, right: Clause) extends Clause {
-  override def filter(edge: Edge) = left.filter(edge) && right.filter(edge)
-}
-
-case class Or(left: Clause, right: Clause) extends Clause {
-  override def filter(edge: Edge) = left.filter(edge) || right.filter(edge)
-}
-
-object WhereParser {
-  val success = Where()
-}
-
-case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser {
-
-  val anyStr = "[^\\s(),]+".r
-
-  val and = "and|AND".r
-
-  val or = "or|OR".r
-
-  val between = "between|BETWEEN".r
-
-  val in = "in|IN".r
-
-  val notIn = "not in|NOT IN".r
-
-  def where: Parser[Where] = rep(clause) ^^ (Where(_))
-
-  def paren: Parser[Clause] = "(" ~> clause <~ ")"
-
-  def clause: Parser[Clause] = (predicate | paren) * (and ^^^ { (a: Clause, b: 
Clause) => And(a, b) } | or ^^^ { (a: Clause, b: Clause) => Or(a, b) })
-
-  def identWithDot: Parser[String] = repsep(ident, ".") ^^ { case values => 
values.mkString(".") }
-
-  def predicate = {
-    identWithDot ~ ("!=" | "=") ~ anyStr ^^ {
-      case f ~ op ~ s =>
-        if (op == "=") Eq(f, s)
-        else Not(Eq(f, s))
-    } | identWithDot ~ (">=" | "<=" | ">" | "<") ~ anyStr ^^ {
-      case f ~ op ~ s => op match {
-        case ">" => Gt(f, s)
-        case ">=" => Or(Gt(f, s), Eq(f, s))
-        case "<" => Lt(f, s)
-        case "<=" => Or(Lt(f, s), Eq(f, s))
-      }
-    } | identWithDot ~ (between ~> anyStr <~ and) ~ anyStr ^^ {
-      case f ~ minV ~ maxV => Between(f, minV, maxV)
-    } | identWithDot ~ (notIn | in) ~ ("(" ~> repsep(anyStr, ",") <~ ")") ^^ {
-      case f ~ op ~ values =>
-        val inClause =
-          if (f.startsWith("_parent")) IN(f, values.toSet)
-          else InWithoutParent(label, f, values.toSet)
-        if (op.toLowerCase == "in") inClause
-        else Not(inClause)
-
-
-      case _ => throw WhereParserException(s"Failed to parse where clause. ")
-    }
-  }
-
-  def parse(sql: String): Try[Where] = Try {
-    parseAll(where, sql) match {
-      case Success(r, q) => r
-      case fail => throw WhereParserException(s"Where parsing error: 
${fail.toString}")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
deleted file mode 100644
index f8129db..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala
+++ /dev/null
@@ -1,609 +0,0 @@
-package com.kakao.s2graph.core.rest
-
-import java.util.concurrent.{Callable, TimeUnit}
-
-import com.google.common.cache.{CacheLoader, CacheBuilder}
-import com.kakao.s2graph.core.GraphExceptions.{BadQueryException, 
ModelNotFoundException}
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.parsers.{Where, WhereParser}
-import com.kakao.s2graph.core.types._
-import com.typesafe.config.Config
-import play.api.libs.json._
-import scala.util.{Failure, Success, Try}
-
-object TemplateHelper {
-  val findVar = """\"?\$\{(.*?)\}\"?""".r
-  val num = 
"""(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r
-  val hour = 60 * 60 * 1000L
-  val day = hour * 24L
-  val week = day * 7L
-  def calculate(now: Long, n: Int, unit: String): Long = {
-    val duration = unit match {
-      case "hour" | "HOUR" => n * hour
-      case "day" | "DAY" => n * day
-      case "week" | "WEEK" => n * week
-      case _ => n * day
-    }
-
-    duration + now
-  }
-
-  def replaceVariable(now: Long, body: String): String = {
-    findVar.replaceAllIn(body, m => {
-      val matched = m group 1
-
-      num.replaceSomeIn(matched, m => {
-        val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3))
-        val ts = _pivot match {
-          case null => now
-          case "now" | "NOW" => now
-          case "next_week" | "NEXT_WEEK" => now / week * week + week
-          case "next_day" | "NEXT_DAY" => now / day * day + day
-          case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour
-        }
-
-        if (_pivot == null && n == null && unit == null) None
-        else if (n == null || unit == null) Option(ts.toString)
-        else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString)
-      })
-    })
-  }
-}
-
-class RequestParser(config: Config) extends JSONParser {
-
-  import Management.JsonModel._
-
-  val hardLimit = 100000
-  val defaultLimit = 100
-  val maxLimit = Int.MaxValue - 1
-  val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout")
-  val DefaultMaxAttempt = config.getInt("hbase.client.retries.number")
-  val DefaultCluster = config.getString("hbase.zookeeper.quorum")
-  val DefaultCompressionAlgorithm = 
config.getString("hbase.table.compression.algorithm")
-  val DefaultPhase = config.getString("phase")
-  val parserCache = CacheBuilder.newBuilder()
-    .expireAfterAccess(10000, TimeUnit.MILLISECONDS)
-    .expireAfterWrite(10000, TimeUnit.MILLISECONDS)
-    .maximumSize(10000)
-    .initialCapacity(1000)
-    .build[String, Try[Where]]
-
-  private def extractScoring(labelId: Int, value: JsValue) = {
-    val ret = for {
-      js <- parse[Option[JsObject]](value, "scoring")
-    } yield {
-      for {
-        (k, v) <- js.fields
-        labelOrderType <- LabelMeta.findByName(labelId, k)
-      } yield {
-        val value = v match {
-          case n: JsNumber => n.as[Double]
-          case _ => throw new Exception("scoring weight should be double.")
-        }
-        (labelOrderType.seq, value)
-      }
-    }
-    ret
-  }
-
-  def extractInterval(label: Label, _jsValue: JsValue) = {
-    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_jsValue.toString())
-    val jsValue = Json.parse(replaced)
-
-    def extractKv(js: JsValue) = js match {
-      case JsObject(obj) => obj
-      case JsArray(arr) => arr.flatMap {
-        case JsObject(obj) => obj
-        case _ => throw new RuntimeException(s"cannot support json type $js")
-      }
-      case _ => throw new RuntimeException(s"cannot support json type: $js")
-    }
-
-    val ret = for {
-      js <- parse[Option[JsObject]](jsValue, "interval")
-      fromJs <- (js \ "from").asOpt[JsValue]
-      toJs <- (js \ "to").asOpt[JsValue]
-    } yield {
-      val from = Management.toProps(label, extractKv(fromJs))
-      val to = Management.toProps(label, extractKv(toJs))
-      (from, to)
-    }
-
-    ret
-  }
-
-  def extractDuration(label: Label, _jsValue: JsValue) = {
-    val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_jsValue.toString())
-    val jsValue = Json.parse(replaced)
-
-    for {
-      js <- parse[Option[JsObject]](jsValue, "duration")
-    } yield {
-      val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue)
-      val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue)
-
-      if (minTs > maxTs) {
-        throw new BadQueryException("Duration error. Timestamp of From cannot 
be larger than To.")
-      }
-
-      (minTs, maxTs)
-    }
-  }
-
-  def extractHas(label: Label, jsValue: JsValue) = {
-    val ret = for {
-      js <- parse[Option[JsObject]](jsValue, "has")
-    } yield {
-      for {
-        (k, v) <- js.fields
-        labelMeta <- LabelMeta.findByName(label.id.get, k)
-        value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion)
-      } yield {
-        labelMeta.seq -> value
-      }
-    }
-    ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike])
-  }
-  
-  def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = 
{
-    whereClauseOpt match {
-      case None => Success(WhereParser.success)
-      case Some(_where) =>
-        val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
_where)
-        val whereParserKey = s"${label.label}_${where}"
-        parserCache.get(whereParserKey, new Callable[Try[Where]] {
-          override def call(): Try[Where] = {
-            WhereParser(label).parse(where) match {
-              case s@Success(_) => s
-              case Failure(ex) => throw BadQueryException(ex.getMessage, ex)
-            }
-          }
-        })
-    }
-  }
-
-  def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): 
Seq[Vertex] = {
-    val vertices = for {
-      label <- Label.findByName(labelName).toSeq
-      serviceColumn = if (direction == "out") label.srcColumn else 
label.tgtColumn
-      id <- ids
-      innerId <- jsValueToInnerVal(id, serviceColumn.columnType, 
label.schemaVersion)
-    } yield {
-      Vertex(SourceVertexId(serviceColumn.id.get, innerId), 
System.currentTimeMillis())
-    }
-    vertices.toSeq
-  }
-
-  def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery 
= {
-    val queries = for {
-      queryJson <- (jsValue \ 
"queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty)
-    } yield {
-      toQuery(queryJson, isEdgeQuery)
-    }
-    val weights = (jsValue \ 
"weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0))
-    MultiQuery(queries = queries, weights = weights,
-      queryOption = toQueryOption(jsValue), jsonQuery = jsValue)
-  }
-
-  def toQueryOption(jsValue: JsValue): QueryOption = {
-    val filterOutFields = (jsValue \ 
"filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name))
-    val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => 
toQuery(v) }.map { q =>
-      q.copy(queryOption = q.queryOption.copy(filterOutFields = 
filterOutFields))
-    }
-    val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true)
-    val selectColumns = (jsValue \ 
"select").asOpt[List[String]].getOrElse(List.empty)
-    val groupByColumns = (jsValue \ 
"groupBy").asOpt[List[String]].getOrElse(List.empty)
-    val orderByColumns: List[(String, Boolean)] = (jsValue \ 
"orderBy").asOpt[List[JsObject]].map { jsLs =>
-      for {
-        js <- jsLs
-        (column, orderJs) <- js.fields
-      } yield {
-        val ascending = orderJs.as[String].toUpperCase match {
-          case "ASC" => true
-          case "DESC" => false
-        }
-        column -> ascending
-      }
-    }.getOrElse(List("score" -> false, "timestamp" -> false))
-    val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true)
-    val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false)
-    //TODO: Refactor this
-    val limitOpt = (jsValue \ "limit").asOpt[Int]
-    val returnAgg = (jsValue \ "returnAgg").asOpt[Boolean].getOrElse(true)
-    val scoreThreshold = (jsValue \ 
"scoreThreshold").asOpt[Double].getOrElse(Double.MinValue)
-    val returnDegree = (jsValue \ 
"returnDegree").asOpt[Boolean].getOrElse(true)
-
-    QueryOption(removeCycle = removeCycle,
-      selectColumns = selectColumns,
-      groupByColumns = groupByColumns,
-      orderByColumns = orderByColumns,
-      filterOutQuery = filterOutQuery,
-      filterOutFields = filterOutFields,
-      withScore = withScore,
-      returnTree = returnTree,
-      limitOpt = limitOpt,
-      returnAgg = returnAgg,
-      scoreThreshold = scoreThreshold,
-      returnDegree = returnDegree
-    )
-  }
-  def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = {
-    try {
-      val vertices =
-        (for {
-          value <- parse[List[JsValue]](jsValue, "srcVertices")
-          serviceName = parse[String](value, "serviceName")
-          column = parse[String](value, "columnName")
-        } yield {
-          val service = Service.findByName(serviceName).getOrElse(throw 
BadQueryException("service not found"))
-          val col = ServiceColumn.find(service.id.get, column).getOrElse(throw 
BadQueryException("bad column name"))
-          val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ 
"ids").asOpt[List[JsValue]])
-          for {
-            idVal <- idOpt ++ idsOpt.toSeq.flatten
-
-            /** bug, need to use labels schemaVersion  */
-            innerVal <- jsValueToInnerVal(idVal, col.columnType, 
col.schemaVersion)
-          } yield {
-            Vertex(SourceVertexId(col.id.get, innerVal), 
System.currentTimeMillis())
-          }
-        }).flatten
-
-      if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is 
empty")
-      val steps = parse[Vector[JsValue]](jsValue, "steps")
-
-      val queryOption = toQueryOption(jsValue)
-
-      val querySteps =
-        steps.zipWithIndex.map { case (step, stepIdx) =>
-          val labelWeights = step match {
-            case obj: JsObject =>
-              val converted = for {
-                (k, v) <- (obj \ 
"weights").asOpt[JsObject].getOrElse(Json.obj()).fields
-                l <- Label.findByName(k)
-              } yield {
-                l.id.get -> v.toString().toDouble
-              }
-              converted.toMap
-            case _ => Map.empty[Int, Double]
-          }
-          val queryParamJsVals = step match {
-            case arr: JsArray => arr.as[List[JsValue]]
-            case obj: JsObject => (obj \ "step").as[List[JsValue]]
-            case _ => List.empty[JsValue]
-          }
-          val nextStepScoreThreshold = step match {
-            case obj: JsObject => (obj \ 
"nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
-            case _ => QueryParam.DefaultThreshold
-          }
-          val nextStepLimit = step match {
-            case obj: JsObject => (obj \ 
"nextStepLimit").asOpt[Int].getOrElse(-1)
-            case _ => -1
-          }
-          val cacheTTL = step match {
-            case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1)
-            case _ => -1
-          }
-          val queryParams =
-            for {
-              labelGroup <- queryParamJsVals
-              queryParam <- parseQueryParam(labelGroup)
-            } yield {
-              val (_, columnName) =
-                if (queryParam.labelWithDir.dir == 
GraphUtil.directions("out")) {
-                  (queryParam.label.srcService.serviceName, 
queryParam.label.srcColumnName)
-                } else {
-                  (queryParam.label.tgtService.serviceName, 
queryParam.label.tgtColumnName)
-                }
-              //FIXME:
-              if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => 
v.serviceColumn.columnName == columnName)) {
-                throw BadQueryException("srcVertices contains incompatiable 
serviceName or columnName with first step.")
-              }
-
-              queryParam
-            }
-          Step(queryParams.toList, labelWeights = labelWeights,
-            //            scoreThreshold = stepThreshold,
-            nextStepScoreThreshold = nextStepScoreThreshold,
-            nextStepLimit = nextStepLimit,
-            cacheTTL = cacheTTL)
-
-        }
-
-      val ret = Query(vertices, querySteps, queryOption, jsValue)
-      //      logger.debug(ret.toString)
-      ret
-    } catch {
-      case e: BadQueryException =>
-        throw e
-      case e: ModelNotFoundException =>
-        throw BadQueryException(e.getMessage, e)
-      case e: Exception =>
-        throw BadQueryException(s"$jsValue, $e", e)
-    }
-  }
-
-  private def parseQueryParam(labelGroup: JsValue): Option[QueryParam] = {
-    for {
-      labelName <- parse[Option[String]](labelGroup, "label")
-    } yield {
-      val label = Label.findByName(labelName).getOrElse(throw 
BadQueryException(s"$labelName not found"))
-      val direction = parse[Option[String]](labelGroup, 
"direction").map(GraphUtil.toDirection(_)).getOrElse(0)
-      val limit = {
-        parse[Option[Int]](labelGroup, "limit") match {
-          case None => defaultLimit
-          case Some(l) if l < 0 => maxLimit
-          case Some(l) if l >= 0 =>
-            val default = hardLimit
-            Math.min(l, default)
-        }
-      }
-      val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0)
-      val interval = extractInterval(label, labelGroup)
-      val duration = extractDuration(label, labelGroup)
-      val scoring = extractScoring(label.id.get, 
labelGroup).getOrElse(List.empty[(Byte, Double)]).toList
-      val exclude = parse[Option[Boolean]](labelGroup, 
"exclude").getOrElse(false)
-      val include = parse[Option[Boolean]](labelGroup, 
"include").getOrElse(false)
-      val hasFilter = extractHas(label, labelGroup)
-      val labelWithDir = LabelWithDirection(label.id.get, direction)
-      val indexNameOpt = (labelGroup \ "index").asOpt[String]
-      val indexSeq = indexNameOpt match {
-        case None => label.indexSeqsMap.get(scoring.map(kv => 
kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq)
-        case Some(indexName) => 
label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new 
RuntimeException("cannot find index"))
-      }
-      val whereClauseOpt = (labelGroup \ "where").asOpt[String]
-      val where = extractWhere(label, whereClauseOpt)
-      val includeDegree = (labelGroup \ 
"includeDegree").asOpt[Boolean].getOrElse(true)
-      val rpcTimeout = (labelGroup \ 
"rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout)
-      val maxAttempt = (labelGroup \ 
"maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt)
-      val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { 
jsVal =>
-        jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, 
label.schemaVersion)
-      }
-      val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L)
-      val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { 
jsVal =>
-        val propName = (jsVal \ 
"propName").asOpt[String].getOrElse(LabelMeta.timestamp.name)
-        val propNameSeq = 
label.metaPropsInvMap.get(propName).map(_.seq).getOrElse(LabelMeta.timeStampSeq)
-        val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0)
-        val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1)
-        if (decayRate >= 1.0 || decayRate <= 0.0) throw new 
BadQueryException("decay rate should be 0.0 ~ 1.0")
-        val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 
24.0)
-        TimeDecay(initial, decayRate, timeUnit, propNameSeq)
-      }
-      val threshold = (labelGroup \ 
"threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold)
-      // TODO: refactor this. dirty
-      val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => 
Query.DuplicatePolicy(s))
-
-      val outputField = (labelGroup \ "outputField").asOpt[String].map(s => 
Json.arr(Json.arr(s)))
-      val transformer = if (outputField.isDefined) outputField else 
(labelGroup \ "transform").asOpt[JsValue]
-      val scorePropagateOp = (labelGroup \ 
"scorePropagateOp").asOpt[String].getOrElse("multiply")
-      val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1)
-      val shouldNormalize = (labelGroup \ 
"normalize").asOpt[Boolean].getOrElse(false)
-      val cursorOpt = (labelGroup \ "cursor").asOpt[String]
-      // FIXME: Order of command matter
-      QueryParam(labelWithDir)
-        .sample(sample)
-        .limit(offset, limit)
-        .rank(RankParam(label.id.get, scoring))
-        .exclude(exclude)
-        .include(include)
-        .duration(duration)
-        .has(hasFilter)
-        .labelOrderSeq(indexSeq)
-        .interval(interval)
-        .where(where)
-        .duplicatePolicy(duplicate)
-        .includeDegree(includeDegree)
-        .rpcTimeout(rpcTimeout)
-        .maxAttempt(maxAttempt)
-        .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt)
-        .cacheTTLInMillis(cacheTTL)
-        .timeDecay(timeDecayFactor)
-        .threshold(threshold)
-        .transformer(transformer)
-        .scorePropagateOp(scorePropagateOp)
-        .shouldNormalize(shouldNormalize)
-        .cursorOpt(cursorOpt)
-    }
-  }
-
-  private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = 
{
-    (js \ key).validate[R]
-      .fold(
-        errors => {
-          val msg = (JsError.toFlatJson(errors) \ 
"obj").as[List[JsValue]].map(x => x \ "msg")
-          val e = Json.obj("args" -> key, "error" -> msg)
-          throw new GraphExceptions.JsonParseException(Json.obj("error" -> 
key).toString)
-        },
-        r => {
-          r
-        })
-  }
-
-  def toJsValues(jsValue: JsValue): List[JsValue] = {
-    jsValue match {
-      case obj: JsObject => List(obj)
-      case arr: JsArray => arr.as[List[JsValue]]
-      case _ => List.empty[JsValue]
-    }
-
-  }
-
-  def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], 
List[JsValue]) = {
-    val jsValues = toJsValues(jsValue)
-    val edges = jsValues.flatMap(toEdge(_, operation))
-
-    (edges, jsValues)
-  }
-
-  def toEdges(jsValue: JsValue, operation: String): List[Edge] = {
-    toJsValues(jsValue).flatMap { edgeJson =>
-      toEdge(edgeJson, operation)
-    }
-  }
-
-
-  private def toEdge(jsValue: JsValue, operation: String): List[Edge] = {
-
-    def parseId(js: JsValue) = js match {
-      case s: JsString => s.as[String]
-      case o@_ => s"${o}"
-    }
-    val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_))
-    val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_))
-    val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ srcId
-    val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms 
=> froms.map(js => parseId(js))) ++ tgtId
-
-    val label = parse[String](jsValue, "label")
-    val timestamp = parse[Long](jsValue, "timestamp")
-    val direction = parse[Option[String]](jsValue, "direction").getOrElse("")
-    val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}")
-    for {
-      srcId <- srcIds
-      tgtId <- tgtIds
-    } yield {
-      Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, 
props.toString)
-    }
-  }
-
-  def toVertices(jsValue: JsValue, operation: String, serviceName: 
Option[String] = None, columnName: Option[String] = None) = {
-    toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName))
-  }
-
-  def toVertex(jsValue: JsValue, operation: String, serviceName: 
Option[String] = None, columnName: Option[String] = None): Vertex = {
-    val id = parse[JsValue](jsValue, "id")
-    val ts = parse[Option[Long]](jsValue, 
"timestamp").getOrElse(System.currentTimeMillis())
-    val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") 
else serviceName.get
-    val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") 
else columnName.get
-    val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj())
-    Management.toVertex(ts, operation, id.toString, sName, cName, 
props.toString)
-  }
-
-  def toPropElements(jsObj: JsValue) = Try {
-    val propName = (jsObj \ "name").as[String]
-    val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
-    val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
-      case JsString(s) => s
-      case _@js => js.toString
-    }
-    Prop(propName, defaultValue, dataType)
-  }
-
-  def toPropsElements(jsValue: JsValue): Seq[Prop] = for {
-    jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil)
-  } yield {
-    val propName = (jsObj \ "name").as[String]
-    val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String])
-    val defaultValue = (jsObj \ "defaultValue").as[JsValue] match {
-      case JsString(s) => s
-      case _@js => js.toString
-    }
-    Prop(propName, defaultValue, dataType)
-  }
-
-  def toIndicesElements(jsValue: JsValue): Seq[Index] = for {
-    jsObj <- jsValue.as[Seq[JsValue]]
-    indexName = (jsObj \ "name").as[String]
-    propNames = (jsObj \ "propNames").as[Seq[String]]
-  } yield Index(indexName, propNames)
-
-  def toLabelElements(jsValue: JsValue) = Try {
-    val labelName = parse[String](jsValue, "label")
-    val srcServiceName = parse[String](jsValue, "srcServiceName")
-    val tgtServiceName = parse[String](jsValue, "tgtServiceName")
-    val srcColumnName = parse[String](jsValue, "srcColumnName")
-    val tgtColumnName = parse[String](jsValue, "tgtColumnName")
-    val srcColumnType = parse[String](jsValue, "srcColumnType")
-    val tgtColumnType = parse[String](jsValue, "tgtColumnType")
-    val serviceName = (jsValue \ 
"serviceName").asOpt[String].getOrElse(tgtServiceName)
-    val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true)
-
-    val allProps = toPropsElements(jsValue \ "props")
-    val indices = toIndicesElements(jsValue \ "indices")
-
-    val consistencyLevel = (jsValue \ 
"consistencyLevel").asOpt[String].getOrElse("weak")
-
-    // expect new label don`t provide hTableName
-    val hTableName = (jsValue \ "hTableName").asOpt[String]
-    val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
-    val schemaVersion = (jsValue \ 
"schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION)
-    val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false)
-    val compressionAlgorithm = (jsValue \ 
"compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
-
-    (labelName, srcServiceName, srcColumnName, srcColumnType,
-      tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName,
-      indices, allProps, consistencyLevel, hTableName, hTableTTL, 
schemaVersion, isAsync, compressionAlgorithm)
-  }
-
-  def toIndexElements(jsValue: JsValue) = Try {
-    val labelName = parse[String](jsValue, "label")
-    val indices = toIndicesElements(jsValue \ "indices")
-    (labelName, indices)
-  }
-
-  def toServiceElements(jsValue: JsValue) = {
-    val serviceName = parse[String](jsValue, "serviceName")
-    val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(DefaultCluster)
-    val hTableName = (jsValue \ 
"hTableName").asOpt[String].getOrElse(s"${serviceName}-${DefaultPhase}")
-    val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1)
-    val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int]
-    val compressionAlgorithm = (jsValue \ 
"compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm)
-    (serviceName, cluster, hTableName, preSplitSize, hTableTTL, 
compressionAlgorithm)
-  }
-
-  def toServiceColumnElements(jsValue: JsValue) = Try {
-    val serviceName = parse[String](jsValue, "serviceName")
-    val columnName = parse[String](jsValue, "columnName")
-    val columnType = parse[String](jsValue, "columnType")
-    val props = toPropsElements(jsValue \ "props")
-    (serviceName, columnName, columnType, props)
-  }
-
-  def toCheckEdgeParam(jsValue: JsValue) = {
-    val params = jsValue.as[List[JsValue]]
-    var isReverted = false
-    val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]()
-    val quads = for {
-      param <- params
-      labelName <- (param \ "label").asOpt[String]
-      direction <- GraphUtil.toDir((param \ 
"direction").asOpt[String].getOrElse("out"))
-      label <- Label.findByName(labelName)
-      srcId <- jsValueToInnerVal((param \ "from").as[JsValue], 
label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion)
-      tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], 
label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion)
-    } yield {
-      val labelWithDir = LabelWithDirection(label.id.get, direction)
-      labelWithDirs += labelWithDir
-      val (src, tgt, dir) = if (direction == 1) {
-        isReverted = true
-        (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)),
-          Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)), 0)
-      } else {
-        (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, 
srcId)),
-          Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, 
tgtId)), 0)
-      }
-      (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir)))
-    }
-    (quads, isReverted)
-  }
-
-  def toGraphElements(str: String): Seq[GraphElement] = {
-    val edgeStrs = str.split("\\n")
-
-    for {
-      edgeStr <- edgeStrs
-      str <- GraphUtil.parseString(edgeStr)
-      element <- Graph.toGraphElement(str)
-    } yield element
-  }
-
-  def toDeleteParam(json: JsValue) = {
-    val labelName = (json \ "label").as[String]
-    val labels = Label.findByName(labelName).map { l => Seq(l) 
}.getOrElse(Nil).filterNot(_.isAsync)
-    val direction = (json \ "direction").asOpt[String].getOrElse("out")
-
-    val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
-    val ts = (json \ 
"timestamp").asOpt[Long].getOrElse(System.currentTimeMillis())
-    val vertices = toVertices(labelName, direction, ids)
-    (labels, direction, ids, ts, vertices)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
deleted file mode 100644
index b130854..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-package com.kakao.s2graph.core.rest
-
-import java.net.URL
-
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service}
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.json._
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.Try
-
-
-object RestHandler {
-  case class HandlerResult(body: Future[JsValue], headers: (String, String)*)
-}
-
-/**
-  * Public API, only return Future.successful or Future.failed
-  * Don't throw exception
-  */
-class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
-
-  import RestHandler._
-  val requestParser = new RequestParser(graph.config)
-
-  /**
-    * Public APIS
-    */
-  def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): 
HandlerResult = {
-    try {
-      val jsQuery = Json.parse(body)
-
-      uri match {
-        case "/graphs/getEdges" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
-        case "/graphs/getEdges/grouped" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted))
-        case "/graphs/getEdgesExcluded" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson))
-        case "/graphs/getEdgesExcluded/grouped" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
-        case "/graphs/checkEdges" => checkEdges(jsQuery)
-        case "/graphs/getEdgesGrouped" => 
HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList))
-        case "/graphs/getEdgesGroupedExcluded" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude))
-        case "/graphs/getEdgesGroupedExcludedFormatted" => 
HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted))
-        case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery))
-        case uri if uri.startsWith("/graphs/experiment") =>
-          val Array(accessToken, experimentName, uuid) = 
uri.split("/").takeRight(3)
-          experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt)
-        case _ => throw new RuntimeException("route is not found")
-      }
-    } catch {
-      case e: Exception => HandlerResult(Future.failed(e))
-    }
-  }
-
-  // TODO: Refactor to doGet
-  def checkEdges(jsValue: JsValue): HandlerResult = {
-    try {
-      val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue)
-
-      HandlerResult(graph.checkEdges(quads).map { case 
queryRequestWithResultLs =>
-        val edgeJsons = for {
-          queryRequestWithResult <- queryRequestWithResultLs
-          (queryRequest, queryResult) = 
QueryRequestWithResult.unapply(queryRequestWithResult).get
-          edgeWithScore <- queryResult.edgeWithScoreLs
-          (edge, score) = EdgeWithScore.unapply(edgeWithScore).get
-          convertedEdge = if (isReverted) edge.duplicateEdge else edge
-          edgeJson = PostProcess.edgeToJson(convertedEdge, score, 
queryRequest.query, queryRequest.queryParam)
-        } yield Json.toJson(edgeJson)
-
-        Json.toJson(edgeJsons)
-      })
-    } catch {
-      case e: Exception => HandlerResult(Future.failed(e))
-    }
-  }
-
-
-  /**
-    * Private APIS
-    */
-  private def experiment(contentsBody: JsValue, accessToken: String, 
experimentName: String, uuid: String, impKeyOpt: => Option[String]): 
HandlerResult = {
-
-    try {
-      val bucketOpt = for {
-        service <- Service.findByAccessToken(accessToken)
-        experiment <- Experiment.findBy(service.id.get, experimentName)
-        bucket <- experiment.findBucket(uuid, impKeyOpt)
-      } yield bucket
-
-      val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is 
not found"))
-      if (bucket.isGraphQuery) {
-        val ret = buildRequestInner(contentsBody, bucket, uuid)
-        HandlerResult(ret.body, Experiment.impressionKey -> 
bucket.impressionId)
-      }
-      else throw new RuntimeException("not supported yet")
-    } catch {
-      case e: Exception => HandlerResult(Future.failed(e))
-    }
-  }
-
-  private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: 
String): HandlerResult = {
-    if (bucket.isEmpty) 
HandlerResult(Future.successful(PostProcess.emptyResults))
-    else {
-      val body = buildRequestBody(Option(contentsBody), bucket, uuid)
-      val url = new URL(bucket.apiPath)
-      val path = url.getPath
-
-      // dummy log for sampling
-      val experimentLog = s"POST $path took -1 ms 200 -1 $body"
-      logger.debug(experimentLog)
-
-      doPost(path, body)
-    }
-  }
-
-  private def eachQuery(post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = {
-    val filterOutQueryResultsLs = q.filterOutQuery match {
-      case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-      case None => Future.successful(Seq.empty)
-    }
-
-    for {
-      queryResultsLs <- graph.getEdges(q)
-      filterOutResultsLs <- filterOutQueryResultsLs
-    } yield {
-      val json = post(queryResultsLs, filterOutResultsLs)
-      json
-    }
-  }
-
-  def getEdgesAsync(jsonQuery: JsValue)
-                   (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-
-    val fetch = eachQuery(post) _
-    jsonQuery match {
-      case JsArray(arr) => 
Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray)
-      case obj@JsObject(_) =>
-        (obj \ "queries").asOpt[JsValue] match {
-          case None => fetch(requestParser.toQuery(obj))
-          case _ =>
-            val multiQuery = requestParser.toMultiQuery(obj)
-            val filterOutFuture = multiQuery.queryOption.filterOutQuery match {
-              case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-              case None => Future.successful(Seq.empty)
-            }
-            val futures = multiQuery.queries.zip(multiQuery.weights).map { 
case (query, weight) =>
-              val filterOutQueryResultsLs = query.queryOption.filterOutQuery 
match {
-                case Some(filterOutQuery) => graph.getEdges(filterOutQuery)
-                case None => Future.successful(Seq.empty)
-              }
-              for {
-                queryRequestWithResultLs <- graph.getEdges(query)
-                filterOutResultsLs <- filterOutQueryResultsLs
-              } yield {
-                val newQueryRequestWithResult = for {
-                  queryRequestWithResult <- queryRequestWithResultLs
-                  queryResult = queryRequestWithResult.queryResult
-                } yield {
-                  val newEdgesWithScores = for {
-                    edgeWithScore <- 
queryRequestWithResult.queryResult.edgeWithScoreLs
-                  } yield {
-                    edgeWithScore.copy(score = edgeWithScore.score * weight)
-                  }
-                  queryRequestWithResult.copy(queryResult = 
queryResult.copy(edgeWithScoreLs = newEdgesWithScores))
-                }
-                logger.debug(s"[Size]: 
${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}")
-                (newQueryRequestWithResult, filterOutResultsLs)
-              }
-            }
-            for {
-              filterOut <- filterOutFuture
-              resultWithExcludeLs <- Future.sequence(futures)
-            } yield {
-              PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, 
resultWithExcludeLs, filterOut)
-              //              val initial = 
(ListBuffer.empty[QueryRequestWithResult], 
ListBuffer.empty[QueryRequestWithResult])
-              //              val (results, excludes) = 
resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), 
(results, excludes)) =>
-              //                (prevResults ++= results, prevExcludes ++= 
excludes)
-              //              }
-              //              
PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ 
filterOut)
-            }
-        }
-      case _ => throw BadQueryException("Cannot support")
-    }
-  }
-
-  private def getEdgesExcludedAsync(jsonQuery: JsValue)
-                                   (post: (Seq[QueryRequestWithResult], 
Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = {
-    val q = requestParser.toQuery(jsonQuery)
-    val filterOutQuery = Query(q.vertices, Vector(q.steps.last))
-
-    val fetchFuture = graph.getEdges(q)
-    val excludeFuture = graph.getEdges(filterOutQuery)
-
-    for {
-      queryResultLs <- fetchFuture
-      exclude <- excludeFuture
-    } yield {
-      post(queryResultLs, exclude)
-    }
-  }
-
-  private def getVertices(jsValue: JsValue) = {
-    val jsonQuery = jsValue
-    val ts = System.currentTimeMillis()
-    val props = "{}"
-
-    val vertices = jsonQuery.as[List[JsValue]].flatMap { js =>
-      val serviceName = (js \ "serviceName").as[String]
-      val columnName = (js \ "columnName").as[String]
-      for (id <- (js \ 
"ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield {
-        Management.toVertex(ts, "insert", id.toString, serviceName, 
columnName, props)
-      }
-    }
-
-    graph.getVertices(vertices) map { vertices => 
PostProcess.verticesToJson(vertices) }
-  }
-
-  private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: 
Bucket, uuid: String): String = {
-    var body = bucket.requestBody.replace("#uuid", uuid)
-
-    //    // replace variable
-    //    body = TemplateHelper.replaceVariable(System.currentTimeMillis(), 
body)
-
-    // replace param
-    for {
-      requestKeyJson <- requestKeyJsonOpt
-      jsObj <- requestKeyJson.asOpt[JsObject]
-      (key, value) <- jsObj.fieldSet
-    } {
-      val replacement = value match {
-        case JsString(s) => s
-        case _ => value.toString
-      }
-      body = body.replace(key, replacement)
-    }
-
-    body
-  }
-
-  def calcSize(js: JsValue): Int = js match {
-    case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0)
-    case JsArray(seq) => seq.map(js => (js \ 
"size").asOpt[Int].getOrElse(0)).sum
-    case _ => 0
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala
deleted file mode 100644
index cff968f..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.storage.{SKeyValue, StorageDeserializable}
-import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId, 
VertexId}
-import org.apache.hadoop.hbase.util.Bytes
-
-
-trait Deserializable[E] extends StorageDeserializable[E] {
-  import StorageDeserializable._
-
-  type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
-
-  /** version 1 and version 2 share same code for parsing row key part */
-  def parseRow(kv: SKeyValue, version: String): RowKeyRaw = {
-    var pos = 0
-    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, 
kv.row.length, version)
-    pos += srcIdLen
-    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-    pos += 4
-    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, 
pos)
-
-    val rowLen = srcIdLen + 4 + 1
-    (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala
deleted file mode 100644
index a0aa261..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.hbase.async.KeyValue
-
-
-object SKeyValue {
-  val Put = 1
-  val Delete = 2
-  val Increment = 3
-  val Default = Put
-}
-case class SKeyValue(table: Array[Byte],
-                     row: Array[Byte],
-                     cf: Array[Byte],
-                     qualifier: Array[Byte],
-                     value: Array[Byte],
-                     timestamp: Long,
-                     operation: Int = SKeyValue.Default) {
-  def toLogString = {
-    Map("table" -> table.toList, "row" -> row.toList, "cf" -> 
Bytes.toString(cf),
-      "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> 
timestamp,
-      "operation" -> operation).toString
-  }
-  override def toString(): String = toLogString
-}
-
-trait CanSKeyValue[T] {
-  def toSKeyValue(from: T): SKeyValue
-}
-
-object CanSKeyValue {
-
-  // For asyncbase KeyValues
-  implicit val asyncKeyValue = new CanSKeyValue[KeyValue] {
-    def toSKeyValue(kv: KeyValue): SKeyValue = {
-      SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), 
kv.value(), kv.timestamp())
-    }
-  }
-
-  // For asyncbase KeyValues
-  implicit val sKeyValue = new CanSKeyValue[SKeyValue] {
-    def toSKeyValue(kv: SKeyValue): SKeyValue = kv
-  }
-
-  // For hbase KeyValues
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala 
b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala
deleted file mode 100644
index bee064b..0000000
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.kakao.s2graph.core.storage
-
-import com.kakao.s2graph.core.storage.StorageSerializable
-
-object Serializable {
-  val vertexCf = "v".getBytes()
-  val edgeCf = "e".getBytes()
-}
-
-trait Serializable[E] extends StorageSerializable[E]

Reply via email to